-
Notifications
You must be signed in to change notification settings - Fork 0
/
QradarAPI.py
77 lines (50 loc) · 2.01 KB
/
QradarAPI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# QRadar API call with Python
# Warren Perez
import requests
import json
import time
##################################### VARIABLES #####################################
headers={
"Version":"<API Version>",
"SEC":"<TOKEN>",
"Accept":"application/json",
"Content-Type": "application/json",
}
QRadarURL= "<IP>"
# Search
search_query = "<AQL query>"
searchUrl= "https://" + QRadarURL + "/api/ariel/searches?query_expression="
searchQueryURL = searchUrl + search_query
#First you need to make the search query, then you need to use the query ID to search for the results
##################################### FUNCTIONS #####################################
def queryQRadarAPI(url, headers):
#verify=false is necessary because it is not able to verify the Certificate
response = requests.post(url,headers=headers,verify=False)
jsonRes = response.json()
return jsonRes
def getResults(queryID, headers):
url = "https://" + QRadarURL + "/api/ariel/searches/" + queryID + "/results"
response = requests.get(url,headers=headers,verify=False)
print(url)
return response
def getQueryStatus(queryID):
# Get the query status information like status, query_execution_time
url = "https://" + QRadarURL + "/api/ariel/searches/"+queryID
response = requests.get(url,headers=headers,verify=False)
#print ("Response status: ",response.content)
jsonRes=response.json()
if jsonRes['status'] == "COMPLETED":
return jsonRes
##################################### END FUNCTIONS #####################################
##################################### MAIN #####################################
#Run query
jsonResponse = queryQRadarAPI(searchQueryURL,headers)
#Get cursor of the queries
cursorID = jsonResponse['cursor_id']
#Get results from previously ran query
time.sleep(5)
#get query status
jsonResults = getQueryStatus(cursorID)
query_execution_time = jsonResults['query_execution_time']
print ("Search Status: ",query_execution_time)
##################################### END MAIN #####################################