155 lines
6.2 KiB
Python
155 lines
6.2 KiB
Python
|
|
# Ivanti API class/wrapper | Evan Compton (P2886385), updated 11/13/2025
|
||
|
|
|
||
|
|
### ! README | IMPORTANT INFORMATION ! ###
|
||
|
|
# requires an "Ivanti_config.ini" file in the same directory
|
||
|
|
# edit "Ivanti_config_template.ini", then save as "Ivanti_config.ini"
|
||
|
|
|
||
|
|
### ? CODE PURPOSE ? ###
|
||
|
|
# the primary purpose of this class/wrapper is to export data as a Pandas Dataframe and/or a CSV file
|
||
|
|
# this class primarily targets these endpoints: host, tag, hostFinding, vulnerability
|
||
|
|
# it should work on other endpoints as well, but the 4 above are the only ones tested
|
||
|
|
# usage examples of this class are at the end of this file
|
||
|
|
|
||
|
|
# library imports
|
||
|
|
import requests, urllib3, configparser, pandas as pd
|
||
|
|
from requests.adapters import HTTPAdapter
|
||
|
|
from urllib3 import Retry
|
||
|
|
|
||
|
|
# fix (ignore) SSL verification...
|
||
|
|
# Charter-specific issue; feel free to fix this if you can...
|
||
|
|
from urllib3.exceptions import InsecureRequestWarning
|
||
|
|
urllib3.disable_warnings(InsecureRequestWarning)
|
||
|
|
|
||
|
|
# Ivanti API class
|
||
|
|
class Ivanti:
|
||
|
|
def __init__(self, config_file='./Ivanti_config.ini'):
|
||
|
|
# read our config file
|
||
|
|
config = configparser.ConfigParser()
|
||
|
|
config.read(config_file)
|
||
|
|
|
||
|
|
# set up environment & auth
|
||
|
|
PLATFORM = config.get('platform', 'url') + config.get('platform', 'api_ver')
|
||
|
|
IVANTI_API_KEY = config.get('secrets', 'api_key')
|
||
|
|
self.CLIENT_ID = config.get('platform', 'client_id')
|
||
|
|
self.URL_BASE = f'{PLATFORM}/client/{self.CLIENT_ID}'
|
||
|
|
|
||
|
|
# universal header for our requests
|
||
|
|
self.header = {
|
||
|
|
'x-api-key': IVANTI_API_KEY,
|
||
|
|
'content-type': 'application/json'
|
||
|
|
}
|
||
|
|
|
||
|
|
# dictionaries for filters and fields, sorted with keys by endpoint prefixes
|
||
|
|
self.filters = {}
|
||
|
|
self.fields = {}
|
||
|
|
return
|
||
|
|
|
||
|
|
# function used for HTTP requests- thank you, Ivanti... useful code
|
||
|
|
def request(max_retries=5, backoff_factor=0.5, status_forcelist=(419,429)):
|
||
|
|
"""
|
||
|
|
Create a Requests session that uses automatic retries.
|
||
|
|
:param max_retries: Maximum number of retries to attempt
|
||
|
|
:type max_retries: int
|
||
|
|
:param backoff_factor: Backoff factor used to calculate time between retries.
|
||
|
|
:type backoff_factor: float
|
||
|
|
:param status_forcelist: A tuple containing the response status codes that should trigger a retry.
|
||
|
|
:type status_forcelist: tuple
|
||
|
|
:return: Requests Session
|
||
|
|
:rtype: Requests Session Object
|
||
|
|
"""
|
||
|
|
session = requests.Session()
|
||
|
|
retry = Retry(
|
||
|
|
total=max_retries,
|
||
|
|
read=max_retries,
|
||
|
|
connect=max_retries,
|
||
|
|
backoff_factor=backoff_factor,
|
||
|
|
status_forcelist=status_forcelist,
|
||
|
|
)
|
||
|
|
adapter = HTTPAdapter(max_retries=retry)
|
||
|
|
session.mount('https://', adapter)
|
||
|
|
return session
|
||
|
|
|
||
|
|
# retrieve all filters for an endpoint (tag, host, etc)
|
||
|
|
def get_filters(self, endp='tag'):
|
||
|
|
URL_FILTERS = f'{self.URL_BASE}/{endp}/filter'
|
||
|
|
self.last_resp = self.request().get(URL_FILTERS, headers=self.header, verify=False)
|
||
|
|
self.filters[endp] = self.last_resp.json()
|
||
|
|
return self.filters[endp]
|
||
|
|
|
||
|
|
# retrieve all fields for an endpoint (tag, host, etc)
|
||
|
|
def get_fields(self, endp='tag'):
|
||
|
|
URL_FIELDS = f'{self.URL_BASE}/{endp}/export/template'
|
||
|
|
self.last_resp = self.request().get(URL_FIELDS, headers=self.header, verify=False)
|
||
|
|
self.fields[endp] = self.last_resp.json()['exportableFields']
|
||
|
|
return self.fields[endp]
|
||
|
|
|
||
|
|
# this uses the "{subject}/search" endpoint instead of "{subject}/export"
|
||
|
|
def search(self, endp='tag', save=None, pages=None, size=750):
|
||
|
|
'''
|
||
|
|
Uses the "/client/{client_id}/{subject}/search" endpoint to export data as JSON.
|
||
|
|
:param endp: String for endpoint name; host, tag, group, etc. (default: "tag")
|
||
|
|
:param save: String for filename to save, end with ".csv" (default: none)
|
||
|
|
:param pages: Integer to limit the number of pages to pull (default: all pages)
|
||
|
|
:param size: Integer defining how many records to pull per page (default: 750 records)
|
||
|
|
:return: Pandas DataFrame
|
||
|
|
'''
|
||
|
|
# most endpoints follow the same URL structure and usage pattern
|
||
|
|
# filters and fields dont matter for searches- only for exports!
|
||
|
|
URL_SEARCH = f'{self.URL_BASE}/{endp}/search'
|
||
|
|
body = {
|
||
|
|
'projection': 'basic', # can also be set to 'detail'
|
||
|
|
'sort': [
|
||
|
|
{
|
||
|
|
'field': 'id',
|
||
|
|
'direction': 'ASC'
|
||
|
|
}
|
||
|
|
],
|
||
|
|
'page': 0,
|
||
|
|
'size': size
|
||
|
|
}
|
||
|
|
|
||
|
|
# post a search, get first page
|
||
|
|
resp = self.request().post(URL_SEARCH, headers=self.header, json=body, verify=False)
|
||
|
|
if resp.status_code != 200:
|
||
|
|
raise Exception(f'[!] ERROR: Search failed.\n- code: {resp.status_code}\n- text: {resp.text}')
|
||
|
|
totalPages = int(resp.json()['page']['totalPages'])
|
||
|
|
totalRecords = int(resp.json()['page']['totalElements'])
|
||
|
|
body['page'] = int(resp.json()['page']['number']) + 1
|
||
|
|
msg = f'[?] Search requested for "{endp}"\n[?] Total pages: {totalPages}\n[?] Total records: {totalRecords}\n[?] Batch size: {size}'
|
||
|
|
if pages:
|
||
|
|
msg += f'\n[?] Page limit: {pages} pages'
|
||
|
|
print(msg)
|
||
|
|
|
||
|
|
# limit results?
|
||
|
|
if pages:
|
||
|
|
totalPages = pages
|
||
|
|
|
||
|
|
# loop until the last page
|
||
|
|
subject = f'{endp[:-1]}ies' if endp.endswith('y') else f'{endp}s'
|
||
|
|
data = []
|
||
|
|
while body['page'] < totalPages:
|
||
|
|
resp = self.request().post(URL_SEARCH, headers=self.header, json=body, verify=False)
|
||
|
|
body['page'] = int(resp.json()['page']['number']) + 1
|
||
|
|
data.extend(resp.json()['_embedded'][subject])
|
||
|
|
print(f'[?] Page progress: [{body["page"]}/{totalPages}] ({len(data)} total records retrieved)\r', end='')
|
||
|
|
print(f'\n[+] Search completed. {len(data)} records retrieved!')
|
||
|
|
|
||
|
|
# make a nice dataframe, save file if wanted, return the frame
|
||
|
|
df = pd.DataFrame(data)
|
||
|
|
if save:
|
||
|
|
df.to_csv(save, index=False)
|
||
|
|
return df
|
||
|
|
|
||
|
|
### ? EXAMPLE USAGE ? ###
|
||
|
|
# configure the connection and auth, create an instance object
|
||
|
|
#API = Ivanti('./Ivanti_config.ini')
|
||
|
|
|
||
|
|
# the "search" function goes to the "/client/{clientID}/{subject}/search" endpoint
|
||
|
|
#df = API.search('host', save='IvantiHostsTest_5pages.csv', pages=5)
|
||
|
|
#df = API.search('tag', save='IvantiTagsTest_5pages.csv', pages=5)
|
||
|
|
#df = API.search('hostFinding', save='IvantiHostFindingsTest_5pages.csv', pages=5)
|
||
|
|
#df = API.search('vulnerability', save='IvantiVulnerabilitiesTest_5pages.csv', pages=5)
|
||
|
|
|
||
|
|
# you can also retrieve all possible filters and exportable fields per subject
|
||
|
|
#filters = API.get_fields('host')
|
||
|
|
#fields = API.get_filters('tag')
|