""" Memory Relevance Scorer — RSO Phase 2. Scores every indexed memory file using the formula from the RSO spec: Score = (access_frequency × 3) + (influence_rate × 5) - (age_days × 0.1) - (staleness_risk × 2) Tiers: core (>8) : High-value, actively referenced — keep at top of retrieval active (3–8) : In-use memory — maintain as-is archive (0–3) : Low-signal, old, or redundant — candidate for archival stale (<0) : High staleness risk, never accessed — recommend archival Access frequency is tracked via the memory_access_log table (added to memory_index.db in Phase 2). On first run there is no history; scores will be age + staleness only. Frequency builds from the next agent session onward. Output: memory_workspace/observation/summaries/memory-scores-YYYY-MM-DD.json """ import json import re import sqlite3 import threading import time from datetime import date, datetime from pathlib import Path from typing import Any, Dict, List, Optional # --------------------------------------------------------------------------- # Staleness heuristic patterns # --------------------------------------------------------------------------- _RE_IP = re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b") _RE_CREDENTIALS = re.compile( r"\b(password|passwd|credential|api[_\s\-]?key|token|secret)\b", re.IGNORECASE, ) _RE_STATUS = re.compile( r"\b(running|stopped|active|inactive|enabled|disabled|up|down)\b", re.IGNORECASE, ) _RE_VERSION = re.compile(r"v\d+\.\d+(?:\.\d+)?|\bversion\s+\d", re.IGNORECASE) _RE_DATE = re.compile(r"(202[0-9])-(\d{2})-(\d{2})") _RE_DAILY_NAME = re.compile(r"(\d{4})-(\d{2})-(\d{2})\.md$") class MemoryRelevanceScorer: """Score all indexed memory files for the weekly reflection agent.""" def __init__(self, workspace_dir: str) -> None: self._workspace = Path(workspace_dir) self._db_path = self._workspace / "memory_index.db" self._summaries_dir = ( self._workspace / "observation" / "summaries" ) self._summaries_dir.mkdir(parents=True, exist_ok=True) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def score_all(self, lookback_days: int = 30) -> Dict[str, Any]: """Score every indexed memory file. Returns full report dict. Cold-start mode: when the access log is empty (no history yet), the full spec formula degrades everything to stale — useless output. In cold-start, a baseline of 5.0 is used so age and staleness can still differentiate files while access data accumulates. Full formula (once data exists): score = (access × 3) + (influence × 5) - (age × 0.1) - (staleness × 2) Cold-start formula: score = 5.0 - (age × 0.05) - (staleness × 2) """ cutoff_ms = int((time.time() - lookback_days * 86400) * 1000) today = date.today() db = sqlite3.connect(str(self._db_path), check_same_thread=False) db.row_factory = sqlite3.Row try: files = db.execute( "SELECT path, mtime, size FROM files ORDER BY mtime ASC" ).fetchall() # Determine cold-start: any accesses at all in the lookback window? total_accesses = self._total_access_count(db, cutoff_ms) cold_start = total_accesses == 0 scored: List[Dict[str, Any]] = [] for row in files: path = row["path"] mtime_ms = row["mtime"] content = self._read_file(path) access_count = self._access_count(db, path, cutoff_ms) age_days = self._age_days(path, mtime_ms, today) staleness_risk = self._staleness_risk(content, today) influence_rate = self._influence_proxy(access_count) if cold_start: # Gentler age decay (0.05 instead of 0.1); baseline of 5 # so files don't all collapse to stale before we have data. score = 5.0 - (age_days * 0.05) - (staleness_risk * 2) else: score = ( (access_count * 3) + (influence_rate * 5) - (age_days * 0.1) - (staleness_risk * 2) ) tier = _tier(score) scored.append( { "path": path, "score": round(score, 2), "tier": tier, "age_days": round(age_days, 1), "access_frequency": access_count, "influence_rate": round(influence_rate, 2), "staleness_risk": round(staleness_risk, 2), "staleness_flags": self._staleness_flags(content), "recommendation": _recommendation(tier, age_days), "cold_start": cold_start, } ) finally: db.close() scored.sort(key=lambda x: x["score"]) tier_counts = {"core": 0, "active": 0, "archive": 0, "stale": 0} for e in scored: tier_counts[e["tier"]] = tier_counts.get(e["tier"], 0) + 1 note: Optional[str] = None if cold_start: note = ( "COLD START: no access history yet. Scores use age+staleness only " "(baseline 5.0, age penalty 0.05/day). Full formula activates once " "memory_access_log accumulates data from live sessions." ) return { "generated_at": datetime.now().astimezone().isoformat(), "lookback_days": lookback_days, "cold_start": cold_start, "files_scored": len(scored), "note": note, "summary": { "core_memory": tier_counts["core"], "active_memory": tier_counts["active"], "archive_candidates": tier_counts["archive"], "stale_candidates": tier_counts["stale"], }, "archive_recommendations": [ e for e in scored if e["recommendation"] == "archive" and e["age_days"] >= 30 ], "entries": scored, } def write_report(self, lookback_days: int = 30) -> Path: """Generate and write JSON report; returns the output path.""" report = self.score_all(lookback_days) today = datetime.now().strftime("%Y-%m-%d") out_path = self._summaries_dir / f"memory-scores-{today}.json" out_path.write_text( json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8" ) print( f"[MemoryScorer] Report written -> {out_path.name} " f"({report['files_scored']} files, " f"{report['summary']['archive_candidates']} archive candidates, " f"{report['summary']['stale_candidates']} stale)" ) return out_path def print_summary(self, lookback_days: int = 30) -> None: """Print a human-readable summary table to stdout.""" report = self.score_all(lookback_days) s = report["summary"] sep = "-" * 60 print( f"\n{sep}\n" f"Memory Relevance Report ({report['generated_at'][:10]})\n" f"Lookback: {lookback_days}d | Files scored: {report['files_scored']}\n" f"{sep}\n" f" Core (>8) : {s['core_memory']:3d}\n" f" Active (3-8) : {s['active_memory']:3d}\n" f" Archive (0-3) : {s['archive_candidates']:3d}\n" f" Stale (<0) : {s['stale_candidates']:3d}\n" f"{sep}" ) if report.get("note"): print(f" NOTE: {report['note']}") archive = report["archive_recommendations"] if archive: print(f"\n Archive candidates (age >=30d, score <3):") for e in archive[:10]: flags = ", ".join(e["staleness_flags"]) or "none" print( f" {e['path']:<40} " f"score={e['score']:>6.2f} " f"age={e['age_days']:>5.0f}d " f"flags=[{flags}]" ) if len(archive) > 10: print(f" ... and {len(archive) - 10} more") print() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _read_file(self, rel_path: str) -> str: try: return (self._workspace / rel_path).read_text(encoding="utf-8") except Exception: return "" def _total_access_count( self, db: sqlite3.Connection, cutoff_ms: int ) -> int: """Total accesses across all paths in the lookback window.""" try: row = db.execute( "SELECT COUNT(*) AS n FROM memory_access_log WHERE accessed_at >= ?", (cutoff_ms,), ).fetchone() return row["n"] if row else 0 except sqlite3.OperationalError: return 0 def _access_count( self, db: sqlite3.Connection, path: str, cutoff_ms: int ) -> int: try: row = db.execute( "SELECT COUNT(*) AS n FROM memory_access_log " "WHERE path = ? AND accessed_at >= ?", (path, cutoff_ms), ).fetchone() return row["n"] if row else 0 except sqlite3.OperationalError: # Table doesn't exist yet on very first run before schema migration return 0 def _age_days( self, path: str, mtime_ms: int, today: date ) -> float: """Age in days — prefer date extracted from filename for daily logs.""" m = _RE_DAILY_NAME.search(path) if m: try: file_date = date(int(m.group(1)), int(m.group(2)), int(m.group(3))) return float((today - file_date).days) except ValueError: pass return (time.time() - mtime_ms / 1000) / 86400 def _staleness_risk(self, content: str, today: date) -> float: """0.0–3.0 staleness score from content heuristics.""" score = 0.0 if _RE_IP.search(content): score += 1.0 if _RE_CREDENTIALS.search(content): score += 1.0 if _RE_STATUS.search(content): score += 0.5 if _RE_VERSION.search(content): score += 0.5 # Past dates mentioned in content (more than 30 days ago) for m in _RE_DATE.finditer(content): try: mentioned = date(int(m.group(1)), int(m.group(2)), int(m.group(3))) if (today - mentioned).days > 30: score += 0.5 break # Only penalise once per file except ValueError: pass return min(score, 3.0) def _staleness_flags(self, content: str) -> List[str]: flags: List[str] = [] if _RE_IP.search(content): flags.append("ip_addresses") if _RE_CREDENTIALS.search(content): flags.append("credentials") if _RE_STATUS.search(content): flags.append("status_references") if _RE_VERSION.search(content): flags.append("version_numbers") return flags @staticmethod def _influence_proxy(access_count: int) -> float: """Proxy for influence rate — no real data until access log fills.""" if access_count >= 5: return 0.8 if access_count >= 2: return 0.5 if access_count == 1: return 0.3 return 0.0 # --------------------------------------------------------------------------- # Pure functions # --------------------------------------------------------------------------- def _tier(score: float) -> str: if score > 8: return "core" if score >= 3: return "active" if score >= 0: return "archive" return "stale" def _recommendation(tier: str, age_days: float) -> str: if tier in ("core", "active"): return "keep" if tier == "archive": return "archive" if age_days >= 60 else "monitor" # stale — archive rather than delete (Phase 3 safety rule) return "archive" # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- if __name__ == "__main__": import sys workspace = sys.argv[1] if len(sys.argv) > 1 else "./memory_workspace" scorer = MemoryRelevanceScorer(workspace) scorer.print_summary() path = scorer.write_report() print(f"Full report: {path}")