diff --git a/backend/scripts/import_notes_from_csv.py b/backend/scripts/import_notes_from_csv.py new file mode 100644 index 0000000..b7689a2 --- /dev/null +++ b/backend/scripts/import_notes_from_csv.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +import_notes_from_csv.py +------------------------ +Mass-import finding notes from a CSV file into the CVE dashboard database. + +CSV format (header row required, column names are case-insensitive): + ID,NOTES + 12345,EXC-5754 + 67890,EXC-6001 - pending review + +Usage: + python3 import_notes_from_csv.py [--db ] [--dry-run] + +Options: + --db Path to cve_database.db (default: ../cve_database.db) + --dry-run Print what would change without touching the database +""" + +import csv +import sqlite3 +import sys +import os +import argparse +from datetime import datetime, timezone + +NOTE_MAX_LEN = 255 + +DEFAULT_DB = os.path.join(os.path.dirname(__file__), '..', 'cve_database.db') + + +def parse_args(): + p = argparse.ArgumentParser(description='Import finding notes from CSV into the dashboard DB.') + p.add_argument('csv_file', help='Path to the CSV file (must have ID and NOTES columns)') + p.add_argument('--db', default=DEFAULT_DB, help=f'Path to SQLite database (default: {DEFAULT_DB})') + p.add_argument('--dry-run', action='store_true', help='Preview changes without writing to DB') + return p.parse_args() + + +def load_csv(path): + """Read CSV and return list of (finding_id, note) tuples.""" + rows = [] + with open(path, newline='', encoding='utf-8-sig') as f: + reader = csv.DictReader(f) + # Normalise header names to uppercase for case-insensitive matching + if reader.fieldnames is None: + print('ERROR: CSV file is empty or has no header row.') + sys.exit(1) + + normalised = {k.strip().upper(): k for k in reader.fieldnames} + if 'ID' not in normalised or 'NOTES' not in normalised: + print(f'ERROR: CSV must have "ID" and "NOTES" columns.') + print(f' Found columns: {list(reader.fieldnames)}') + sys.exit(1) + + id_col = normalised['ID'] + notes_col = normalised['NOTES'] + + for i, row in enumerate(reader, start=2): # start=2 because row 1 is the header + finding_id = row[id_col].strip() + note = row[notes_col].strip() + + if not finding_id: + print(f' WARNING row {i}: empty ID — skipping') + continue + + if len(note) > NOTE_MAX_LEN: + print(f' WARNING row {i} ({finding_id}): note is {len(note)} chars, ' + f'truncating to {NOTE_MAX_LEN}') + note = note[:NOTE_MAX_LEN] + + rows.append((finding_id, note)) + + return rows + + +def run(args): + csv_path = os.path.abspath(args.csv_file) + db_path = os.path.abspath(args.db) + + # ------------------------------------------------------------------ checks + if not os.path.exists(csv_path): + print(f'ERROR: CSV file not found: {csv_path}') + sys.exit(1) + + if not os.path.exists(db_path): + print(f'ERROR: Database not found: {db_path}') + sys.exit(1) + + print(f'CSV : {csv_path}') + print(f'DB : {db_path}') + if args.dry_run: + print('MODE: DRY RUN — no changes will be written\n') + else: + print() + + # ----------------------------------------------------------------- load CSV + rows = load_csv(csv_path) + if not rows: + print('No valid rows found in CSV.') + sys.exit(0) + + print(f'Loaded {len(rows)} row(s) from CSV.\n') + + # ---------------------------------------------------------------- open DB + con = sqlite3.connect(db_path) + con.row_factory = sqlite3.Row + cur = con.cursor() + + # Fetch all known finding IDs so we can warn about mismatches + cur.execute('SELECT total, findings_json FROM ivanti_findings_cache WHERE id = 1') + cache_row = cur.fetchone() + known_ids = set() + if cache_row and cache_row['findings_json']: + import json + try: + findings = json.loads(cache_row['findings_json']) + known_ids = {str(f['id']) for f in findings} + except Exception: + pass # non-fatal — we'll still import and just skip the warning + + # ----------------------------------------------------------------- process + inserted = 0 + updated = 0 + skipped = 0 + + for finding_id, note in rows: + str_id = str(finding_id) + + if known_ids and str_id not in known_ids: + print(f' WARNING: finding ID "{str_id}" not found in current cache — ' + f'note will be stored but won\'t display until a sync pulls that finding') + + # Check if a note already exists + cur.execute('SELECT note FROM ivanti_finding_notes WHERE finding_id = ?', (str_id,)) + existing = cur.fetchone() + + if existing: + if existing['note'] == note: + print(f' SKIP {str_id} — note unchanged') + skipped += 1 + continue + action = 'UPDATE' + updated += 1 + else: + action = 'INSERT' + inserted += 1 + + print(f' {action:6s} {str_id} → {note[:80]}{"…" if len(note) > 80 else ""}') + + if not args.dry_run: + cur.execute( + """ + INSERT INTO ivanti_finding_notes (finding_id, note, updated_at) + VALUES (?, ?, datetime('now')) + ON CONFLICT(finding_id) DO UPDATE + SET note = excluded.note, updated_at = datetime('now') + """, + (str_id, note) + ) + + # ----------------------------------------------------------------- summary + print() + if args.dry_run: + print(f'DRY RUN complete — would insert {inserted}, update {updated}, skip {skipped}.') + else: + con.commit() + print(f'Done — inserted {inserted}, updated {updated}, skipped {skipped} (unchanged).') + + con.close() + + +if __name__ == '__main__': + run(parse_args())