Reads a CSV with ID and NOTES columns, matches finding IDs against the cache, and upserts notes into ivanti_finding_notes. Supports --dry-run for previewing changes, warns on unknown IDs, truncates notes over 255 chars, and skips unchanged rows. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
175 lines
5.7 KiB
Python
175 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
import_notes_from_csv.py
|
|
------------------------
|
|
Mass-import finding notes from a CSV file into the CVE dashboard database.
|
|
|
|
CSV format (header row required, column names are case-insensitive):
|
|
ID,NOTES
|
|
12345,EXC-5754
|
|
67890,EXC-6001 - pending review
|
|
|
|
Usage:
|
|
python3 import_notes_from_csv.py <csv_file> [--db <db_path>] [--dry-run]
|
|
|
|
Options:
|
|
--db <path> Path to cve_database.db (default: ../cve_database.db)
|
|
--dry-run Print what would change without touching the database
|
|
"""
|
|
|
|
import csv
|
|
import sqlite3
|
|
import sys
|
|
import os
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
|
|
NOTE_MAX_LEN = 255
|
|
|
|
DEFAULT_DB = os.path.join(os.path.dirname(__file__), '..', 'cve_database.db')
|
|
|
|
|
|
def parse_args():
|
|
p = argparse.ArgumentParser(description='Import finding notes from CSV into the dashboard DB.')
|
|
p.add_argument('csv_file', help='Path to the CSV file (must have ID and NOTES columns)')
|
|
p.add_argument('--db', default=DEFAULT_DB, help=f'Path to SQLite database (default: {DEFAULT_DB})')
|
|
p.add_argument('--dry-run', action='store_true', help='Preview changes without writing to DB')
|
|
return p.parse_args()
|
|
|
|
|
|
def load_csv(path):
|
|
"""Read CSV and return list of (finding_id, note) tuples."""
|
|
rows = []
|
|
with open(path, newline='', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
# Normalise header names to uppercase for case-insensitive matching
|
|
if reader.fieldnames is None:
|
|
print('ERROR: CSV file is empty or has no header row.')
|
|
sys.exit(1)
|
|
|
|
normalised = {k.strip().upper(): k for k in reader.fieldnames}
|
|
if 'ID' not in normalised or 'NOTES' not in normalised:
|
|
print(f'ERROR: CSV must have "ID" and "NOTES" columns.')
|
|
print(f' Found columns: {list(reader.fieldnames)}')
|
|
sys.exit(1)
|
|
|
|
id_col = normalised['ID']
|
|
notes_col = normalised['NOTES']
|
|
|
|
for i, row in enumerate(reader, start=2): # start=2 because row 1 is the header
|
|
finding_id = row[id_col].strip()
|
|
note = row[notes_col].strip()
|
|
|
|
if not finding_id:
|
|
print(f' WARNING row {i}: empty ID — skipping')
|
|
continue
|
|
|
|
if len(note) > NOTE_MAX_LEN:
|
|
print(f' WARNING row {i} ({finding_id}): note is {len(note)} chars, '
|
|
f'truncating to {NOTE_MAX_LEN}')
|
|
note = note[:NOTE_MAX_LEN]
|
|
|
|
rows.append((finding_id, note))
|
|
|
|
return rows
|
|
|
|
|
|
def run(args):
|
|
csv_path = os.path.abspath(args.csv_file)
|
|
db_path = os.path.abspath(args.db)
|
|
|
|
# ------------------------------------------------------------------ checks
|
|
if not os.path.exists(csv_path):
|
|
print(f'ERROR: CSV file not found: {csv_path}')
|
|
sys.exit(1)
|
|
|
|
if not os.path.exists(db_path):
|
|
print(f'ERROR: Database not found: {db_path}')
|
|
sys.exit(1)
|
|
|
|
print(f'CSV : {csv_path}')
|
|
print(f'DB : {db_path}')
|
|
if args.dry_run:
|
|
print('MODE: DRY RUN — no changes will be written\n')
|
|
else:
|
|
print()
|
|
|
|
# ----------------------------------------------------------------- load CSV
|
|
rows = load_csv(csv_path)
|
|
if not rows:
|
|
print('No valid rows found in CSV.')
|
|
sys.exit(0)
|
|
|
|
print(f'Loaded {len(rows)} row(s) from CSV.\n')
|
|
|
|
# ---------------------------------------------------------------- open DB
|
|
con = sqlite3.connect(db_path)
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
|
|
# Fetch all known finding IDs so we can warn about mismatches
|
|
cur.execute('SELECT total, findings_json FROM ivanti_findings_cache WHERE id = 1')
|
|
cache_row = cur.fetchone()
|
|
known_ids = set()
|
|
if cache_row and cache_row['findings_json']:
|
|
import json
|
|
try:
|
|
findings = json.loads(cache_row['findings_json'])
|
|
known_ids = {str(f['id']) for f in findings}
|
|
except Exception:
|
|
pass # non-fatal — we'll still import and just skip the warning
|
|
|
|
# ----------------------------------------------------------------- process
|
|
inserted = 0
|
|
updated = 0
|
|
skipped = 0
|
|
|
|
for finding_id, note in rows:
|
|
str_id = str(finding_id)
|
|
|
|
if known_ids and str_id not in known_ids:
|
|
print(f' WARNING: finding ID "{str_id}" not found in current cache — '
|
|
f'note will be stored but won\'t display until a sync pulls that finding')
|
|
|
|
# Check if a note already exists
|
|
cur.execute('SELECT note FROM ivanti_finding_notes WHERE finding_id = ?', (str_id,))
|
|
existing = cur.fetchone()
|
|
|
|
if existing:
|
|
if existing['note'] == note:
|
|
print(f' SKIP {str_id} — note unchanged')
|
|
skipped += 1
|
|
continue
|
|
action = 'UPDATE'
|
|
updated += 1
|
|
else:
|
|
action = 'INSERT'
|
|
inserted += 1
|
|
|
|
print(f' {action:6s} {str_id} → {note[:80]}{"…" if len(note) > 80 else ""}')
|
|
|
|
if not args.dry_run:
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO ivanti_finding_notes (finding_id, note, updated_at)
|
|
VALUES (?, ?, datetime('now'))
|
|
ON CONFLICT(finding_id) DO UPDATE
|
|
SET note = excluded.note, updated_at = datetime('now')
|
|
""",
|
|
(str_id, note)
|
|
)
|
|
|
|
# ----------------------------------------------------------------- summary
|
|
print()
|
|
if args.dry_run:
|
|
print(f'DRY RUN complete — would insert {inserted}, update {updated}, skip {skipped}.')
|
|
else:
|
|
con.commit()
|
|
print(f'Done — inserted {inserted}, updated {updated}, skipped {skipped} (unchanged).')
|
|
|
|
con.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
run(parse_args())
|