#!/usr/bin/env python3 """ Parse NTS_AEO compliance xlsx file and write JSON to stdout. Usage: python3 parse_compliance_xlsx.py Output: { "items": [...], # non-compliant asset rows "summary": { ... }, # metric health data from Summary sheet "report_date": "YYYY-MM-DD" | null, "total": int } """ import sys import json import re import pandas as pd from pathlib import Path METRIC_CATEGORIES = { '2.3.4i': 'Vulnerability Management', '2.3.6i': 'Vulnerability Management', '2.3.8i': 'Vulnerability Management', '5.2.4': 'Access & MFA', '5.2.5': 'Access & MFA', '5.2.6': 'Access & MFA', '5.3.4': 'Endpoint Protection', '5.5.2': 'End-of-Life OS', '5.5.4i': 'Vulnerability Management', '5.5.5': 'Decommissioned Assets', '5.8.1': 'Application Security', '7.1.1': 'Logging & Monitoring', '7.6.13': 'Disaster Recovery', '7.6.16': 'Disaster Recovery', '1.1.1': 'Logging & Monitoring', '1.1.3': 'Logging & Monitoring', '1.4.1': 'Logging & Monitoring', '5.2.7': 'Access & MFA', '5.2.8': 'Access & MFA', '7.1.4': 'Logging & Monitoring', 'Missing_AppID': 'Asset Data Quality', 'Missing_DF': 'Asset Data Quality', 'Missing_OS': 'Asset Data Quality', } # Columns that go into the main item fields — everything else becomes extra_json CORE_COLS = { 'Preferred - Hostname', 'GRANITE - IPv4_Address', 'GRANITE - Type', 'Team', 'Compliant', 'Source_Network', 'Vertical', 'GRANITE - Equip_Inst_ID', 'GRANITE - RESPONSIBLE_TEAM', } SKIP_SHEETS = {'Summary', 'CMDB_9box', 'Vulns', 'Aging Dashboard'} def safe_str(val): s = str(val).strip() return '' if s == 'nan' else s def parse_summary(xl): """Return { entries: [...], overall_scores: { customer_network, vertical } }""" df_raw = pd.read_excel(xl, sheet_name='Summary', header=None) overall_scores = { 'customer_network': float(df_raw.iloc[0, 4]) if pd.notna(df_raw.iloc[0, 4]) else None, 'vertical': float(df_raw.iloc[1, 4]) if pd.notna(df_raw.iloc[1, 4]) else None, } df = pd.read_excel(xl, sheet_name='Summary', header=3) # Flatten any newlines in column names df.columns = [str(c).replace('\n', ' ').strip() for c in df.columns] # Locate the sub-vertical/team column robustly team_col = next((c for c in df.columns if 'Sub-Vertical' in c or 'Purchase Group' in c), None) entries = [] for _, row in df.iterrows(): metric_id = safe_str(row.get('Metric', '')) if not metric_id or metric_id in ('Metric',): continue team = safe_str(row.get(team_col, '')) if team_col else '' try: non_compliant = int(row.get('Non-Compliant', 0) or 0) compliant = int(row.get('Compliant', 0) or 0) total = int(row.get('Total', 0) or 0) compliance_pct = float(row.get('Current Compliance', 0) or 0) target = float(row.get('Metric Target', 0) or 0) except (ValueError, TypeError): continue entries.append({ 'metric_id': metric_id, 'team': team, 'priority': safe_str(row.get('Priority / Non-Priority / IR', '')), 'non_compliant': non_compliant, 'compliant': compliant, 'total': total, 'compliance_pct': compliance_pct, 'target': target, 'status': safe_str(row.get('Status', '')), 'description': safe_str(row.get('Metric Description', '')), 'category': METRIC_CATEGORIES.get(metric_id, 'Other'), }) return {'entries': entries, 'overall_scores': overall_scores} def parse_sheet(xl, sheet_name, summary_entries): """Return list of non-compliant item dicts for a detail sheet.""" try: df = pd.read_excel(xl, sheet_name=sheet_name, header=0) except Exception: return [] if df.empty: return [] df.columns = [str(c).strip() for c in df.columns] # Filter to non-compliant rows when the Compliant column exists if 'Compliant' in df.columns: df = df[df['Compliant'] == False] if df.empty: return [] # Look up description from summary metric_desc = '' for e in summary_entries: if e['metric_id'] == sheet_name and e['description']: metric_desc = e['description'] break category = METRIC_CATEGORIES.get(sheet_name, 'Other') items = [] for _, row in df.iterrows(): hostname = safe_str(row.get('Preferred - Hostname', '')) if not hostname: continue ip = safe_str(row.get('GRANITE - IPv4_Address', '')) device_type = safe_str(row.get('GRANITE - Type', '')) team = safe_str(row.get('Team', '')) # Everything non-core goes into extra_json extra = {} for col in df.columns: if col in CORE_COLS: continue val = row.get(col) if pd.isna(val) if not isinstance(val, str) else False: continue s = safe_str(val) if s: extra[col] = val.isoformat() if hasattr(val, 'isoformat') else s items.append({ 'hostname': hostname, 'ip_address': ip, 'device_type': device_type, 'team': team, 'metric_id': sheet_name, 'metric_desc': metric_desc, 'category': category, 'extra_json': extra, }) return items def extract_report_date(filepath): """Try to pull YYYY-MM-DD from the filename, e.g. NTS_AEO_2026_03_25.xlsx""" stem = Path(filepath).stem m = re.search(r'(\d{4})_(\d{2})_(\d{2})', stem) if m: return f"{m.group(1)}-{m.group(2)}-{m.group(3)}" return None def main(): if len(sys.argv) < 2: print(json.dumps({'error': 'No file path provided'})) sys.exit(1) filepath = sys.argv[1] try: xl = pd.ExcelFile(filepath) except Exception as e: print(json.dumps({'error': f'Cannot open file: {str(e)}'})) sys.exit(1) try: summary = parse_summary(xl) except Exception as e: summary = {'entries': [], 'overall_scores': {}, 'parse_error': str(e)} all_items = [] for sheet_name in xl.sheet_names: if sheet_name in SKIP_SHEETS: continue items = parse_sheet(xl, sheet_name, summary.get('entries', [])) all_items.extend(items) print(json.dumps({ 'items': all_items, 'summary': summary, 'report_date': extract_report_date(filepath), 'total': len(all_items), })) if __name__ == '__main__': main()