feat(compliance): add AEO compliance tracking backend
- Migration: compliance_uploads, compliance_items, compliance_notes tables with indexes on (hostname, metric_id) identity key and team/status - Python parser (parse_compliance_xlsx.py): reads NTS_AEO xlsx, extracts non-compliant assets from all detail sheets, parses Summary sheet for metric health data and overall scores, outputs JSON to stdout - Route (/api/compliance): preview/commit upload flow with diff summary, items endpoint grouped by hostname with seen_count tracking, metric summary endpoint for health cards, notes endpoints keyed on (hostname, metric_id) persisting across uploads - server.js: register compliance router at /api/compliance - .gitignore: exclude planning docs and xlsx source files
This commit is contained in:
212
backend/scripts/parse_compliance_xlsx.py
Normal file
212
backend/scripts/parse_compliance_xlsx.py
Normal file
@@ -0,0 +1,212 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Parse NTS_AEO compliance xlsx file and write JSON to stdout.
|
||||
Usage: python3 parse_compliance_xlsx.py <path_to_xlsx>
|
||||
|
||||
Output:
|
||||
{
|
||||
"items": [...], # non-compliant asset rows
|
||||
"summary": { ... }, # metric health data from Summary sheet
|
||||
"report_date": "YYYY-MM-DD" | null,
|
||||
"total": int
|
||||
}
|
||||
"""
|
||||
import sys
|
||||
import json
|
||||
import re
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
|
||||
METRIC_CATEGORIES = {
|
||||
'2.3.4i': 'Vulnerability Management',
|
||||
'2.3.6i': 'Vulnerability Management',
|
||||
'2.3.8i': 'Vulnerability Management',
|
||||
'5.2.4': 'Access & MFA',
|
||||
'5.2.5': 'Access & MFA',
|
||||
'5.2.6': 'Access & MFA',
|
||||
'5.3.4': 'Endpoint Protection',
|
||||
'5.5.2': 'End-of-Life OS',
|
||||
'5.5.4i': 'Vulnerability Management',
|
||||
'5.5.5': 'Decommissioned Assets',
|
||||
'5.8.1': 'Application Security',
|
||||
'7.1.1': 'Logging & Monitoring',
|
||||
'7.6.13': 'Disaster Recovery',
|
||||
'7.6.16': 'Disaster Recovery',
|
||||
'Missing_AppID': 'Asset Data Quality',
|
||||
'Missing_DF': 'Asset Data Quality',
|
||||
'Missing_OS': 'Asset Data Quality',
|
||||
}
|
||||
|
||||
# Columns that go into the main item fields — everything else becomes extra_json
|
||||
CORE_COLS = {
|
||||
'Preferred - Hostname', 'GRANITE - IPv4_Address', 'GRANITE - Type',
|
||||
'Team', 'Compliant', 'Source_Network', 'Vertical',
|
||||
'GRANITE - Equip_Inst_ID', 'GRANITE - RESPONSIBLE_TEAM',
|
||||
}
|
||||
|
||||
SKIP_SHEETS = {'Summary', 'CMDB_9box'}
|
||||
|
||||
|
||||
def safe_str(val):
|
||||
s = str(val).strip()
|
||||
return '' if s == 'nan' else s
|
||||
|
||||
|
||||
def parse_summary(xl):
|
||||
"""Return { entries: [...], overall_scores: { customer_network, vertical } }"""
|
||||
df_raw = pd.read_excel(xl, sheet_name='Summary', header=None)
|
||||
|
||||
overall_scores = {
|
||||
'customer_network': float(df_raw.iloc[0, 4]) if pd.notna(df_raw.iloc[0, 4]) else None,
|
||||
'vertical': float(df_raw.iloc[1, 4]) if pd.notna(df_raw.iloc[1, 4]) else None,
|
||||
}
|
||||
|
||||
df = pd.read_excel(xl, sheet_name='Summary', header=3)
|
||||
# Flatten any newlines in column names
|
||||
df.columns = [str(c).replace('\n', ' ').strip() for c in df.columns]
|
||||
|
||||
# Locate the sub-vertical/team column robustly
|
||||
team_col = next((c for c in df.columns if 'Sub-Vertical' in c or 'Purchase Group' in c), None)
|
||||
|
||||
entries = []
|
||||
for _, row in df.iterrows():
|
||||
metric_id = safe_str(row.get('Metric', ''))
|
||||
if not metric_id or metric_id in ('Metric',):
|
||||
continue
|
||||
|
||||
team = safe_str(row.get(team_col, '')) if team_col else ''
|
||||
|
||||
try:
|
||||
non_compliant = int(row.get('Non-Compliant', 0) or 0)
|
||||
compliant = int(row.get('Compliant', 0) or 0)
|
||||
total = int(row.get('Total', 0) or 0)
|
||||
compliance_pct = float(row.get('Current Compliance', 0) or 0)
|
||||
target = float(row.get('Metric Target', 0) or 0)
|
||||
except (ValueError, TypeError):
|
||||
continue
|
||||
|
||||
entries.append({
|
||||
'metric_id': metric_id,
|
||||
'team': team,
|
||||
'priority': safe_str(row.get('Priority / Non-Priority / IR', '')),
|
||||
'non_compliant': non_compliant,
|
||||
'compliant': compliant,
|
||||
'total': total,
|
||||
'compliance_pct': compliance_pct,
|
||||
'target': target,
|
||||
'status': safe_str(row.get('Status', '')),
|
||||
'description': safe_str(row.get('Metric Description', '')),
|
||||
'category': METRIC_CATEGORIES.get(metric_id, 'Other'),
|
||||
})
|
||||
|
||||
return {'entries': entries, 'overall_scores': overall_scores}
|
||||
|
||||
|
||||
def parse_sheet(xl, sheet_name, summary_entries):
|
||||
"""Return list of non-compliant item dicts for a detail sheet."""
|
||||
try:
|
||||
df = pd.read_excel(xl, sheet_name=sheet_name, header=0)
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
if df.empty:
|
||||
return []
|
||||
|
||||
df.columns = [str(c).strip() for c in df.columns]
|
||||
|
||||
# Filter to non-compliant rows when the Compliant column exists
|
||||
if 'Compliant' in df.columns:
|
||||
df = df[df['Compliant'] == False]
|
||||
|
||||
if df.empty:
|
||||
return []
|
||||
|
||||
# Look up description from summary
|
||||
metric_desc = ''
|
||||
for e in summary_entries:
|
||||
if e['metric_id'] == sheet_name and e['description']:
|
||||
metric_desc = e['description']
|
||||
break
|
||||
|
||||
category = METRIC_CATEGORIES.get(sheet_name, 'Other')
|
||||
|
||||
items = []
|
||||
for _, row in df.iterrows():
|
||||
hostname = safe_str(row.get('Preferred - Hostname', ''))
|
||||
if not hostname:
|
||||
continue
|
||||
|
||||
ip = safe_str(row.get('GRANITE - IPv4_Address', ''))
|
||||
device_type = safe_str(row.get('GRANITE - Type', ''))
|
||||
team = safe_str(row.get('Team', ''))
|
||||
|
||||
# Everything non-core goes into extra_json
|
||||
extra = {}
|
||||
for col in df.columns:
|
||||
if col in CORE_COLS:
|
||||
continue
|
||||
val = row.get(col)
|
||||
if pd.isna(val) if not isinstance(val, str) else False:
|
||||
continue
|
||||
s = safe_str(val)
|
||||
if s:
|
||||
extra[col] = val.isoformat() if hasattr(val, 'isoformat') else s
|
||||
|
||||
items.append({
|
||||
'hostname': hostname,
|
||||
'ip_address': ip,
|
||||
'device_type': device_type,
|
||||
'team': team,
|
||||
'metric_id': sheet_name,
|
||||
'metric_desc': metric_desc,
|
||||
'category': category,
|
||||
'extra_json': extra,
|
||||
})
|
||||
|
||||
return items
|
||||
|
||||
|
||||
def extract_report_date(filepath):
|
||||
"""Try to pull YYYY-MM-DD from the filename, e.g. NTS_AEO_2026_03_25.xlsx"""
|
||||
stem = Path(filepath).stem
|
||||
m = re.search(r'(\d{4})_(\d{2})_(\d{2})', stem)
|
||||
if m:
|
||||
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
|
||||
return None
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(json.dumps({'error': 'No file path provided'}))
|
||||
sys.exit(1)
|
||||
|
||||
filepath = sys.argv[1]
|
||||
|
||||
try:
|
||||
xl = pd.ExcelFile(filepath)
|
||||
except Exception as e:
|
||||
print(json.dumps({'error': f'Cannot open file: {str(e)}'}))
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
summary = parse_summary(xl)
|
||||
except Exception as e:
|
||||
summary = {'entries': [], 'overall_scores': {}, 'parse_error': str(e)}
|
||||
|
||||
all_items = []
|
||||
for sheet_name in xl.sheet_names:
|
||||
if sheet_name in SKIP_SHEETS:
|
||||
continue
|
||||
items = parse_sheet(xl, sheet_name, summary.get('entries', []))
|
||||
all_items.extend(items)
|
||||
|
||||
print(json.dumps({
|
||||
'items': all_items,
|
||||
'summary': summary,
|
||||
'report_date': extract_report_date(filepath),
|
||||
'total': len(all_items),
|
||||
}))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user