feat(compliance): add AEO compliance tracking backend
- Migration: compliance_uploads, compliance_items, compliance_notes tables
with indexes on (hostname, metric_id) identity key and team/status
- Python parser (parse_compliance_xlsx.py): reads NTS_AEO xlsx, extracts
non-compliant assets from all detail sheets, parses Summary sheet for
metric health data and overall scores, outputs JSON to stdout
- Route (/api/compliance): preview/commit upload flow with diff summary,
items endpoint grouped by hostname with seen_count tracking, metric
summary endpoint for health cards, notes endpoints keyed on
(hostname, metric_id) persisting across uploads
- server.js: register compliance router at /api/compliance
- .gitignore: exclude planning docs and xlsx source files
2026-03-31 15:06:59 -06:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
Parse NTS_AEO compliance xlsx file and write JSON to stdout.
|
|
|
|
|
Usage: python3 parse_compliance_xlsx.py <path_to_xlsx>
|
|
|
|
|
|
|
|
|
|
Output:
|
|
|
|
|
{
|
|
|
|
|
"items": [...], # non-compliant asset rows
|
|
|
|
|
"summary": { ... }, # metric health data from Summary sheet
|
|
|
|
|
"report_date": "YYYY-MM-DD" | null,
|
|
|
|
|
"total": int
|
|
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
import sys
|
2026-04-20 20:12:12 +00:00
|
|
|
import os
|
feat(compliance): add AEO compliance tracking backend
- Migration: compliance_uploads, compliance_items, compliance_notes tables
with indexes on (hostname, metric_id) identity key and team/status
- Python parser (parse_compliance_xlsx.py): reads NTS_AEO xlsx, extracts
non-compliant assets from all detail sheets, parses Summary sheet for
metric health data and overall scores, outputs JSON to stdout
- Route (/api/compliance): preview/commit upload flow with diff summary,
items endpoint grouped by hostname with seen_count tracking, metric
summary endpoint for health cards, notes endpoints keyed on
(hostname, metric_id) persisting across uploads
- server.js: register compliance router at /api/compliance
- .gitignore: exclude planning docs and xlsx source files
2026-03-31 15:06:59 -06:00
|
|
|
import json
|
|
|
|
|
import re
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
|
|
|
2026-04-20 20:12:12 +00:00
|
|
|
def load_config():
|
|
|
|
|
"""Load parser configuration from compliance_config.json."""
|
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
config_path = os.path.join(script_dir, 'compliance_config.json')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with open(config_path, 'r') as f:
|
|
|
|
|
config = json.load(f)
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
print(f"Error: Configuration file not found: {config_path}", file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
print(f"Error: Invalid JSON in configuration file {config_path}: {e}", file=sys.stderr)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
return config
|
|
|
|
|
|
feat(compliance): add AEO compliance tracking backend
- Migration: compliance_uploads, compliance_items, compliance_notes tables
with indexes on (hostname, metric_id) identity key and team/status
- Python parser (parse_compliance_xlsx.py): reads NTS_AEO xlsx, extracts
non-compliant assets from all detail sheets, parses Summary sheet for
metric health data and overall scores, outputs JSON to stdout
- Route (/api/compliance): preview/commit upload flow with diff summary,
items endpoint grouped by hostname with seen_count tracking, metric
summary endpoint for health cards, notes endpoints keyed on
(hostname, metric_id) persisting across uploads
- server.js: register compliance router at /api/compliance
- .gitignore: exclude planning docs and xlsx source files
2026-03-31 15:06:59 -06:00
|
|
|
|
2026-04-20 20:12:12 +00:00
|
|
|
_config = load_config()
|
|
|
|
|
METRIC_CATEGORIES = _config['metric_categories']
|
|
|
|
|
CORE_COLS = set(_config['core_cols'])
|
|
|
|
|
SKIP_SHEETS = set(_config['skip_sheets'])
|
feat(compliance): add AEO compliance tracking backend
- Migration: compliance_uploads, compliance_items, compliance_notes tables
with indexes on (hostname, metric_id) identity key and team/status
- Python parser (parse_compliance_xlsx.py): reads NTS_AEO xlsx, extracts
non-compliant assets from all detail sheets, parses Summary sheet for
metric health data and overall scores, outputs JSON to stdout
- Route (/api/compliance): preview/commit upload flow with diff summary,
items endpoint grouped by hostname with seen_count tracking, metric
summary endpoint for health cards, notes endpoints keyed on
(hostname, metric_id) persisting across uploads
- server.js: register compliance router at /api/compliance
- .gitignore: exclude planning docs and xlsx source files
2026-03-31 15:06:59 -06:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_str(val):
|
|
|
|
|
s = str(val).strip()
|
|
|
|
|
return '' if s == 'nan' else s
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_summary(xl):
|
|
|
|
|
"""Return { entries: [...], overall_scores: { customer_network, vertical } }"""
|
|
|
|
|
df_raw = pd.read_excel(xl, sheet_name='Summary', header=None)
|
|
|
|
|
|
|
|
|
|
overall_scores = {
|
|
|
|
|
'customer_network': float(df_raw.iloc[0, 4]) if pd.notna(df_raw.iloc[0, 4]) else None,
|
|
|
|
|
'vertical': float(df_raw.iloc[1, 4]) if pd.notna(df_raw.iloc[1, 4]) else None,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
df = pd.read_excel(xl, sheet_name='Summary', header=3)
|
|
|
|
|
# Flatten any newlines in column names
|
|
|
|
|
df.columns = [str(c).replace('\n', ' ').strip() for c in df.columns]
|
|
|
|
|
|
|
|
|
|
# Locate the sub-vertical/team column robustly
|
|
|
|
|
team_col = next((c for c in df.columns if 'Sub-Vertical' in c or 'Purchase Group' in c), None)
|
|
|
|
|
|
|
|
|
|
entries = []
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
metric_id = safe_str(row.get('Metric', ''))
|
|
|
|
|
if not metric_id or metric_id in ('Metric',):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
team = safe_str(row.get(team_col, '')) if team_col else ''
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
non_compliant = int(row.get('Non-Compliant', 0) or 0)
|
|
|
|
|
compliant = int(row.get('Compliant', 0) or 0)
|
|
|
|
|
total = int(row.get('Total', 0) or 0)
|
|
|
|
|
compliance_pct = float(row.get('Current Compliance', 0) or 0)
|
|
|
|
|
target = float(row.get('Metric Target', 0) or 0)
|
|
|
|
|
except (ValueError, TypeError):
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
entries.append({
|
|
|
|
|
'metric_id': metric_id,
|
|
|
|
|
'team': team,
|
|
|
|
|
'priority': safe_str(row.get('Priority / Non-Priority / IR', '')),
|
|
|
|
|
'non_compliant': non_compliant,
|
|
|
|
|
'compliant': compliant,
|
|
|
|
|
'total': total,
|
|
|
|
|
'compliance_pct': compliance_pct,
|
|
|
|
|
'target': target,
|
|
|
|
|
'status': safe_str(row.get('Status', '')),
|
|
|
|
|
'description': safe_str(row.get('Metric Description', '')),
|
|
|
|
|
'category': METRIC_CATEGORIES.get(metric_id, 'Other'),
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return {'entries': entries, 'overall_scores': overall_scores}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_sheet(xl, sheet_name, summary_entries):
|
|
|
|
|
"""Return list of non-compliant item dicts for a detail sheet."""
|
|
|
|
|
try:
|
|
|
|
|
df = pd.read_excel(xl, sheet_name=sheet_name, header=0)
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
if df.empty:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
df.columns = [str(c).strip() for c in df.columns]
|
|
|
|
|
|
|
|
|
|
# Filter to non-compliant rows when the Compliant column exists
|
|
|
|
|
if 'Compliant' in df.columns:
|
|
|
|
|
df = df[df['Compliant'] == False]
|
|
|
|
|
|
|
|
|
|
if df.empty:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
# Look up description from summary
|
|
|
|
|
metric_desc = ''
|
|
|
|
|
for e in summary_entries:
|
|
|
|
|
if e['metric_id'] == sheet_name and e['description']:
|
|
|
|
|
metric_desc = e['description']
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
category = METRIC_CATEGORIES.get(sheet_name, 'Other')
|
|
|
|
|
|
|
|
|
|
items = []
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
hostname = safe_str(row.get('Preferred - Hostname', ''))
|
|
|
|
|
if not hostname:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
ip = safe_str(row.get('GRANITE - IPv4_Address', ''))
|
|
|
|
|
device_type = safe_str(row.get('GRANITE - Type', ''))
|
|
|
|
|
team = safe_str(row.get('Team', ''))
|
|
|
|
|
|
|
|
|
|
# Everything non-core goes into extra_json
|
|
|
|
|
extra = {}
|
|
|
|
|
for col in df.columns:
|
|
|
|
|
if col in CORE_COLS:
|
|
|
|
|
continue
|
|
|
|
|
val = row.get(col)
|
|
|
|
|
if pd.isna(val) if not isinstance(val, str) else False:
|
|
|
|
|
continue
|
|
|
|
|
s = safe_str(val)
|
|
|
|
|
if s:
|
|
|
|
|
extra[col] = val.isoformat() if hasattr(val, 'isoformat') else s
|
|
|
|
|
|
|
|
|
|
items.append({
|
|
|
|
|
'hostname': hostname,
|
|
|
|
|
'ip_address': ip,
|
|
|
|
|
'device_type': device_type,
|
|
|
|
|
'team': team,
|
|
|
|
|
'metric_id': sheet_name,
|
|
|
|
|
'metric_desc': metric_desc,
|
|
|
|
|
'category': category,
|
|
|
|
|
'extra_json': extra,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
return items
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_report_date(filepath):
|
|
|
|
|
"""Try to pull YYYY-MM-DD from the filename, e.g. NTS_AEO_2026_03_25.xlsx"""
|
|
|
|
|
stem = Path(filepath).stem
|
|
|
|
|
m = re.search(r'(\d{4})_(\d{2})_(\d{2})', stem)
|
|
|
|
|
if m:
|
|
|
|
|
return f"{m.group(1)}-{m.group(2)}-{m.group(3)}"
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
if len(sys.argv) < 2:
|
|
|
|
|
print(json.dumps({'error': 'No file path provided'}))
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
filepath = sys.argv[1]
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
xl = pd.ExcelFile(filepath)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(json.dumps({'error': f'Cannot open file: {str(e)}'}))
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
summary = parse_summary(xl)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
summary = {'entries': [], 'overall_scores': {}, 'parse_error': str(e)}
|
|
|
|
|
|
|
|
|
|
all_items = []
|
|
|
|
|
for sheet_name in xl.sheet_names:
|
|
|
|
|
if sheet_name in SKIP_SHEETS:
|
|
|
|
|
continue
|
|
|
|
|
items = parse_sheet(xl, sheet_name, summary.get('entries', []))
|
|
|
|
|
all_items.extend(items)
|
|
|
|
|
|
|
|
|
|
print(json.dumps({
|
|
|
|
|
'items': all_items,
|
|
|
|
|
'summary': summary,
|
|
|
|
|
'report_date': extract_report_date(filepath),
|
|
|
|
|
'total': len(all_items),
|
|
|
|
|
}))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|