Added knowledge base enhancements for documentation viewing and preloaded Ivanti config for next feature

This commit is contained in:
2026-02-13 09:43:09 -07:00
parent 6fda7de7a3
commit 79a1a23002
11 changed files with 2344 additions and 33 deletions

155
ivantiAPI.py Normal file
View File

@@ -0,0 +1,155 @@
# Ivanti API class/wrapper | Evan Compton (P2886385), updated 11/13/2025
### ! README | IMPORTANT INFORMATION ! ###
# requires an "Ivanti_config.ini" file in the same directory
# edit "Ivanti_config_template.ini", then save as "Ivanti_config.ini"
### ? CODE PURPOSE ? ###
# the primary purpose of this class/wrapper is to export data as a Pandas Dataframe and/or a CSV file
# this class primarily targets these endpoints: host, tag, hostFinding, vulnerability
# it should work on other endpoints as well, but the 4 above are the only ones tested
# usage examples of this class are at the end of this file
# library imports
import requests, urllib3, configparser, pandas as pd
from requests.adapters import HTTPAdapter
from urllib3 import Retry
# fix (ignore) SSL verification...
# Charter-specific issue; feel free to fix this if you can...
from urllib3.exceptions import InsecureRequestWarning
urllib3.disable_warnings(InsecureRequestWarning)
# Ivanti API class
class Ivanti:
def __init__(self, config_file='./Ivanti_config.ini'):
# read our config file
config = configparser.ConfigParser()
config.read(config_file)
# set up environment & auth
PLATFORM = config.get('platform', 'url') + config.get('platform', 'api_ver')
IVANTI_API_KEY = config.get('secrets', 'api_key')
self.CLIENT_ID = config.get('platform', 'client_id')
self.URL_BASE = f'{PLATFORM}/client/{self.CLIENT_ID}'
# universal header for our requests
self.header = {
'x-api-key': IVANTI_API_KEY,
'content-type': 'application/json'
}
# dictionaries for filters and fields, sorted with keys by endpoint prefixes
self.filters = {}
self.fields = {}
return
# function used for HTTP requests- thank you, Ivanti... useful code
def request(max_retries=5, backoff_factor=0.5, status_forcelist=(419,429)):
"""
Create a Requests session that uses automatic retries.
:param max_retries: Maximum number of retries to attempt
:type max_retries: int
:param backoff_factor: Backoff factor used to calculate time between retries.
:type backoff_factor: float
:param status_forcelist: A tuple containing the response status codes that should trigger a retry.
:type status_forcelist: tuple
:return: Requests Session
:rtype: Requests Session Object
"""
session = requests.Session()
retry = Retry(
total=max_retries,
read=max_retries,
connect=max_retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
return session
# retrieve all filters for an endpoint (tag, host, etc)
def get_filters(self, endp='tag'):
URL_FILTERS = f'{self.URL_BASE}/{endp}/filter'
self.last_resp = self.request().get(URL_FILTERS, headers=self.header, verify=False)
self.filters[endp] = self.last_resp.json()
return self.filters[endp]
# retrieve all fields for an endpoint (tag, host, etc)
def get_fields(self, endp='tag'):
URL_FIELDS = f'{self.URL_BASE}/{endp}/export/template'
self.last_resp = self.request().get(URL_FIELDS, headers=self.header, verify=False)
self.fields[endp] = self.last_resp.json()['exportableFields']
return self.fields[endp]
# this uses the "{subject}/search" endpoint instead of "{subject}/export"
def search(self, endp='tag', save=None, pages=None, size=750):
'''
Uses the "/client/{client_id}/{subject}/search" endpoint to export data as JSON.
:param endp: String for endpoint name; host, tag, group, etc. (default: "tag")
:param save: String for filename to save, end with ".csv" (default: none)
:param pages: Integer to limit the number of pages to pull (default: all pages)
:param size: Integer defining how many records to pull per page (default: 750 records)
:return: Pandas DataFrame
'''
# most endpoints follow the same URL structure and usage pattern
# filters and fields dont matter for searches- only for exports!
URL_SEARCH = f'{self.URL_BASE}/{endp}/search'
body = {
'projection': 'basic', # can also be set to 'detail'
'sort': [
{
'field': 'id',
'direction': 'ASC'
}
],
'page': 0,
'size': size
}
# post a search, get first page
resp = self.request().post(URL_SEARCH, headers=self.header, json=body, verify=False)
if resp.status_code != 200:
raise Exception(f'[!] ERROR: Search failed.\n- code: {resp.status_code}\n- text: {resp.text}')
totalPages = int(resp.json()['page']['totalPages'])
totalRecords = int(resp.json()['page']['totalElements'])
body['page'] = int(resp.json()['page']['number']) + 1
msg = f'[?] Search requested for "{endp}"\n[?] Total pages: {totalPages}\n[?] Total records: {totalRecords}\n[?] Batch size: {size}'
if pages:
msg += f'\n[?] Page limit: {pages} pages'
print(msg)
# limit results?
if pages:
totalPages = pages
# loop until the last page
subject = f'{endp[:-1]}ies' if endp.endswith('y') else f'{endp}s'
data = []
while body['page'] < totalPages:
resp = self.request().post(URL_SEARCH, headers=self.header, json=body, verify=False)
body['page'] = int(resp.json()['page']['number']) + 1
data.extend(resp.json()['_embedded'][subject])
print(f'[?] Page progress: [{body["page"]}/{totalPages}] ({len(data)} total records retrieved)\r', end='')
print(f'\n[+] Search completed. {len(data)} records retrieved!')
# make a nice dataframe, save file if wanted, return the frame
df = pd.DataFrame(data)
if save:
df.to_csv(save, index=False)
return df
### ? EXAMPLE USAGE ? ###
# configure the connection and auth, create an instance object
#API = Ivanti('./Ivanti_config.ini')
# the "search" function goes to the "/client/{clientID}/{subject}/search" endpoint
#df = API.search('host', save='IvantiHostsTest_5pages.csv', pages=5)
#df = API.search('tag', save='IvantiTagsTest_5pages.csv', pages=5)
#df = API.search('hostFinding', save='IvantiHostFindingsTest_5pages.csv', pages=5)
#df = API.search('vulnerability', save='IvantiVulnerabilitiesTest_5pages.csv', pages=5)
# you can also retrieve all possible filters and exportable fields per subject
#filters = API.get_fields('host')
#fields = API.get_filters('tag')