Files
cve-dashboard/backend/scripts/parse_compliance_xlsx.py

209 lines
6.3 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Parse NTS_AEO compliance xlsx file and write JSON to stdout.
Usage: python3 parse_compliance_xlsx.py <path_to_xlsx>
Output:
{
"items": [...], # non-compliant asset rows
"summary": { ... }, # metric health data from Summary sheet
"report_date": "YYYY-MM-DD" | null,
"total": int
}
"""
import sys
import os
import json
import re
import pandas as pd
from pathlib import Path
def load_config():
"""Load parser configuration from compliance_config.json."""
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, 'compliance_config.json')
try:
with open(config_path, 'r') as f:
config = json.load(f)
except FileNotFoundError:
print(f"Error: Configuration file not found: {config_path}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in configuration file {config_path}: {e}", file=sys.stderr)
sys.exit(1)
return config
_config = load_config()
METRIC_CATEGORIES = _config['metric_categories']
CORE_COLS = set(_config['core_cols'])
SKIP_SHEETS = set(_config['skip_sheets'])
def safe_str(val):
s = str(val).strip()
return '' if s == 'nan' else s
def parse_summary(xl):
"""Return { entries: [...], overall_scores: { customer_network, vertical } }"""
df_raw = pd.read_excel(xl, sheet_name='Summary', header=None)
overall_scores = {
'customer_network': float(df_raw.iloc[0, 4]) if pd.notna(df_raw.iloc[0, 4]) else None,
'vertical': float(df_raw.iloc[1, 4]) if pd.notna(df_raw.iloc[1, 4]) else None,
}
df = pd.read_excel(xl, sheet_name='Summary', header=3)
# Flatten any newlines in column names
df.columns = [str(c).replace('\n', ' ').strip() for c in df.columns]
# Locate the sub-vertical/team column robustly
team_col = next((c for c in df.columns if 'Sub-Vertical' in c or 'Purchase Group' in c), None)
entries = []
for _, row in df.iterrows():
metric_id = safe_str(row.get('Metric', ''))
if not metric_id or metric_id in ('Metric',):
continue
team = safe_str(row.get(team_col, '')) if team_col else ''
try:
non_compliant = int(row.get('Non-Compliant', 0) or 0)
compliant = int(row.get('Compliant', 0) or 0)
total = int(row.get('Total', 0) or 0)
compliance_pct = float(row.get('Current Compliance', 0) or 0)
target = float(row.get('Metric Target', 0) or 0)
except (ValueError, TypeError):
continue
entries.append({
'metric_id': metric_id,
'team': team,
'priority': safe_str(row.get('Priority / Non-Priority / IR', '')),
'non_compliant': non_compliant,
'compliant': compliant,
'total': total,
'compliance_pct': compliance_pct,
'target': target,
'status': safe_str(row.get('Status', '')),
'description': safe_str(row.get('Metric Description', '')),
'category': METRIC_CATEGORIES.get(metric_id, 'Other'),
})
return {'entries': entries, 'overall_scores': overall_scores}
def parse_sheet(xl, sheet_name, summary_entries):
"""Return list of non-compliant item dicts for a detail sheet."""
try:
df = pd.read_excel(xl, sheet_name=sheet_name, header=0)
except Exception:
return []
if df.empty:
return []
df.columns = [str(c).strip() for c in df.columns]
# Filter to non-compliant rows when the Compliant column exists
if 'Compliant' in df.columns:
df = df[df['Compliant'] == False]
if df.empty:
return []
# Look up description from summary
metric_desc = ''
for e in summary_entries:
if e['metric_id'] == sheet_name and e['description']:
metric_desc = e['description']
break
category = METRIC_CATEGORIES.get(sheet_name, 'Other')
items = []
for _, row in df.iterrows():
hostname = safe_str(row.get('Preferred - Hostname', ''))
if not hostname:
continue
ip = safe_str(row.get('GRANITE - IPv4_Address', ''))
device_type = safe_str(row.get('GRANITE - Type', ''))
team = safe_str(row.get('Team', ''))
# Everything non-core goes into extra_json
extra = {}
for col in df.columns:
if col in CORE_COLS:
continue
val = row.get(col)
if pd.isna(val) if not isinstance(val, str) else False:
continue
s = safe_str(val)
if s:
extra[col] = val.isoformat() if hasattr(val, 'isoformat') else s
items.append({
'hostname': hostname,
'ip_address': ip,
'device_type': device_type,
'team': team,
'metric_id': sheet_name,
'metric_desc': metric_desc,
'category': category,
'extra_json': extra,
})
return items
def extract_report_date(filepath):
"""Try to pull YYYY-MM-DD from the filename, e.g. NTS_AEO_2026_03_25.xlsx"""
stem = Path(filepath).stem
m = re.search(r'(\d{4})_(\d{2})_(\d{2})', stem)
if m:
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
return None
def main():
if len(sys.argv) < 2:
print(json.dumps({'error': 'No file path provided'}))
sys.exit(1)
filepath = sys.argv[1]
try:
xl = pd.ExcelFile(filepath)
except Exception as e:
print(json.dumps({'error': f'Cannot open file: {str(e)}'}))
sys.exit(1)
try:
summary = parse_summary(xl)
except Exception as e:
summary = {'entries': [], 'overall_scores': {}, 'parse_error': str(e)}
all_items = []
for sheet_name in xl.sheet_names:
if sheet_name in SKIP_SHEETS:
continue
items = parse_sheet(xl, sheet_name, summary.get('entries', []))
all_items.extend(items)
print(json.dumps({
'items': all_items,
'summary': summary,
'report_date': extract_report_date(filepath),
'total': len(all_items),
}))
if __name__ == '__main__':
main()