Files
ajarbot/observation/memory_scorer.py

349 lines
13 KiB
Python
Raw Permalink Normal View History

feat: RSO observation system, child safety, Discord adapter, Telegram watchdog, email attachments Core agent improvements: - RSO (Relevance Scoring & Observation) system: interaction_logger, memory_scorer, signal_detector - Memory access logging (memory_access_log table) for relevance scoring; high-signal turn detection - Rich conversation storage for notable turns; compact_conversation truncates long user messages - Task-type classifier (query/action/analysis/creative) for observation tagging - Nested sub-agent visibility: deep delegations now register against the main agent's manager Child safety (Gabriel profile): - child_safety.py: filtering, audit logging, prompt constants for restricted sessions - .kiro/specs/child-safety-profile: requirements, design, tasks specs - GABRIEL_BOT_PROPOSAL.md: initial proposal doc - Reduced context window (10 msgs) and tutor-mode identity for restricted users Telegram adapter: - Polling watchdog: auto-restarts updater if polling drops unexpectedly - get_me() with exponential-backoff retry on NetworkError at startup - Correct stop() ordering: signal watchdog before cancelling tasks Email / Gmail: - send_email: supports file attachments (attachments list param) - get_email: surfaces attachment metadata in response Scheduled tasks / weather: - Remove OpenWeatherMap API calls from morning-weather task; use wttr.in exclusively - New scheduled tasks and scheduler state persistence Discord: - adapters/discord/__init__.py scaffold - discord-plugin: MCP plugin for Claude Code Discord integration (server.ts, skills, config) Infrastructure: - n8n workflow exports (garvis_webhook, content_pipeline variants) - memory_workspace: context, homelab-repo-updates, weekly observation summaries, error logs - UCS C240 migration plan doc - requirements.txt: new deps - .claude/settings.json, fix_hooks.py: hook/permission tuning
2026-04-23 07:54:01 -06:00
"""
Memory Relevance Scorer RSO Phase 2.
Scores every indexed memory file using the formula from the RSO spec:
Score = (access_frequency × 3) + (influence_rate × 5)
- (age_days × 0.1) - (staleness_risk × 2)
Tiers:
core (>8) : High-value, actively referenced keep at top of retrieval
active (38) : In-use memory maintain as-is
archive (03) : Low-signal, old, or redundant candidate for archival
stale (<0) : High staleness risk, never accessed recommend archival
Access frequency is tracked via the memory_access_log table (added to
memory_index.db in Phase 2). On first run there is no history; scores will
be age + staleness only. Frequency builds from the next agent session onward.
Output: memory_workspace/observation/summaries/memory-scores-YYYY-MM-DD.json
"""
import json
import re
import sqlite3
import threading
import time
from datetime import date, datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
# ---------------------------------------------------------------------------
# Staleness heuristic patterns
# ---------------------------------------------------------------------------
_RE_IP = re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b")
_RE_CREDENTIALS = re.compile(
r"\b(password|passwd|credential|api[_\s\-]?key|token|secret)\b",
re.IGNORECASE,
)
_RE_STATUS = re.compile(
r"\b(running|stopped|active|inactive|enabled|disabled|up|down)\b",
re.IGNORECASE,
)
_RE_VERSION = re.compile(r"v\d+\.\d+(?:\.\d+)?|\bversion\s+\d", re.IGNORECASE)
_RE_DATE = re.compile(r"(202[0-9])-(\d{2})-(\d{2})")
_RE_DAILY_NAME = re.compile(r"(\d{4})-(\d{2})-(\d{2})\.md$")
class MemoryRelevanceScorer:
"""Score all indexed memory files for the weekly reflection agent."""
def __init__(self, workspace_dir: str) -> None:
self._workspace = Path(workspace_dir)
self._db_path = self._workspace / "memory_index.db"
self._summaries_dir = (
self._workspace / "observation" / "summaries"
)
self._summaries_dir.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def score_all(self, lookback_days: int = 30) -> Dict[str, Any]:
"""Score every indexed memory file. Returns full report dict.
Cold-start mode: when the access log is empty (no history yet), the
full spec formula degrades everything to stale useless output.
In cold-start, a baseline of 5.0 is used so age and staleness can
still differentiate files while access data accumulates.
Full formula (once data exists):
score = (access × 3) + (influence × 5) - (age × 0.1) - (staleness × 2)
Cold-start formula:
score = 5.0 - (age × 0.05) - (staleness × 2)
"""
cutoff_ms = int((time.time() - lookback_days * 86400) * 1000)
today = date.today()
db = sqlite3.connect(str(self._db_path), check_same_thread=False)
db.row_factory = sqlite3.Row
try:
files = db.execute(
"SELECT path, mtime, size FROM files ORDER BY mtime ASC"
).fetchall()
# Determine cold-start: any accesses at all in the lookback window?
total_accesses = self._total_access_count(db, cutoff_ms)
cold_start = total_accesses == 0
scored: List[Dict[str, Any]] = []
for row in files:
path = row["path"]
mtime_ms = row["mtime"]
content = self._read_file(path)
access_count = self._access_count(db, path, cutoff_ms)
age_days = self._age_days(path, mtime_ms, today)
staleness_risk = self._staleness_risk(content, today)
influence_rate = self._influence_proxy(access_count)
if cold_start:
# Gentler age decay (0.05 instead of 0.1); baseline of 5
# so files don't all collapse to stale before we have data.
score = 5.0 - (age_days * 0.05) - (staleness_risk * 2)
else:
score = (
(access_count * 3)
+ (influence_rate * 5)
- (age_days * 0.1)
- (staleness_risk * 2)
)
tier = _tier(score)
scored.append(
{
"path": path,
"score": round(score, 2),
"tier": tier,
"age_days": round(age_days, 1),
"access_frequency": access_count,
"influence_rate": round(influence_rate, 2),
"staleness_risk": round(staleness_risk, 2),
"staleness_flags": self._staleness_flags(content),
"recommendation": _recommendation(tier, age_days),
"cold_start": cold_start,
}
)
finally:
db.close()
scored.sort(key=lambda x: x["score"])
tier_counts = {"core": 0, "active": 0, "archive": 0, "stale": 0}
for e in scored:
tier_counts[e["tier"]] = tier_counts.get(e["tier"], 0) + 1
note: Optional[str] = None
if cold_start:
note = (
"COLD START: no access history yet. Scores use age+staleness only "
"(baseline 5.0, age penalty 0.05/day). Full formula activates once "
"memory_access_log accumulates data from live sessions."
)
return {
"generated_at": datetime.now().astimezone().isoformat(),
"lookback_days": lookback_days,
"cold_start": cold_start,
"files_scored": len(scored),
"note": note,
"summary": {
"core_memory": tier_counts["core"],
"active_memory": tier_counts["active"],
"archive_candidates": tier_counts["archive"],
"stale_candidates": tier_counts["stale"],
},
"archive_recommendations": [
e for e in scored
if e["recommendation"] == "archive" and e["age_days"] >= 30
],
"entries": scored,
}
def write_report(self, lookback_days: int = 30) -> Path:
"""Generate and write JSON report; returns the output path."""
report = self.score_all(lookback_days)
today = datetime.now().strftime("%Y-%m-%d")
out_path = self._summaries_dir / f"memory-scores-{today}.json"
out_path.write_text(
json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8"
)
print(
f"[MemoryScorer] Report written -> {out_path.name} "
f"({report['files_scored']} files, "
f"{report['summary']['archive_candidates']} archive candidates, "
f"{report['summary']['stale_candidates']} stale)"
)
return out_path
def print_summary(self, lookback_days: int = 30) -> None:
"""Print a human-readable summary table to stdout."""
report = self.score_all(lookback_days)
s = report["summary"]
sep = "-" * 60
print(
f"\n{sep}\n"
f"Memory Relevance Report ({report['generated_at'][:10]})\n"
f"Lookback: {lookback_days}d | Files scored: {report['files_scored']}\n"
f"{sep}\n"
f" Core (>8) : {s['core_memory']:3d}\n"
f" Active (3-8) : {s['active_memory']:3d}\n"
f" Archive (0-3) : {s['archive_candidates']:3d}\n"
f" Stale (<0) : {s['stale_candidates']:3d}\n"
f"{sep}"
)
if report.get("note"):
print(f" NOTE: {report['note']}")
archive = report["archive_recommendations"]
if archive:
print(f"\n Archive candidates (age >=30d, score <3):")
for e in archive[:10]:
flags = ", ".join(e["staleness_flags"]) or "none"
print(
f" {e['path']:<40} "
f"score={e['score']:>6.2f} "
f"age={e['age_days']:>5.0f}d "
f"flags=[{flags}]"
)
if len(archive) > 10:
print(f" ... and {len(archive) - 10} more")
print()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _read_file(self, rel_path: str) -> str:
try:
return (self._workspace / rel_path).read_text(encoding="utf-8")
except Exception:
return ""
def _total_access_count(
self, db: sqlite3.Connection, cutoff_ms: int
) -> int:
"""Total accesses across all paths in the lookback window."""
try:
row = db.execute(
"SELECT COUNT(*) AS n FROM memory_access_log WHERE accessed_at >= ?",
(cutoff_ms,),
).fetchone()
return row["n"] if row else 0
except sqlite3.OperationalError:
return 0
def _access_count(
self, db: sqlite3.Connection, path: str, cutoff_ms: int
) -> int:
try:
row = db.execute(
"SELECT COUNT(*) AS n FROM memory_access_log "
"WHERE path = ? AND accessed_at >= ?",
(path, cutoff_ms),
).fetchone()
return row["n"] if row else 0
except sqlite3.OperationalError:
# Table doesn't exist yet on very first run before schema migration
return 0
def _age_days(
self, path: str, mtime_ms: int, today: date
) -> float:
"""Age in days — prefer date extracted from filename for daily logs."""
m = _RE_DAILY_NAME.search(path)
if m:
try:
file_date = date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
return float((today - file_date).days)
except ValueError:
pass
return (time.time() - mtime_ms / 1000) / 86400
def _staleness_risk(self, content: str, today: date) -> float:
"""0.03.0 staleness score from content heuristics."""
score = 0.0
if _RE_IP.search(content):
score += 1.0
if _RE_CREDENTIALS.search(content):
score += 1.0
if _RE_STATUS.search(content):
score += 0.5
if _RE_VERSION.search(content):
score += 0.5
# Past dates mentioned in content (more than 30 days ago)
for m in _RE_DATE.finditer(content):
try:
mentioned = date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
if (today - mentioned).days > 30:
score += 0.5
break # Only penalise once per file
except ValueError:
pass
return min(score, 3.0)
def _staleness_flags(self, content: str) -> List[str]:
flags: List[str] = []
if _RE_IP.search(content):
flags.append("ip_addresses")
if _RE_CREDENTIALS.search(content):
flags.append("credentials")
if _RE_STATUS.search(content):
flags.append("status_references")
if _RE_VERSION.search(content):
flags.append("version_numbers")
return flags
@staticmethod
def _influence_proxy(access_count: int) -> float:
"""Proxy for influence rate — no real data until access log fills."""
if access_count >= 5:
return 0.8
if access_count >= 2:
return 0.5
if access_count == 1:
return 0.3
return 0.0
# ---------------------------------------------------------------------------
# Pure functions
# ---------------------------------------------------------------------------
def _tier(score: float) -> str:
if score > 8:
return "core"
if score >= 3:
return "active"
if score >= 0:
return "archive"
return "stale"
def _recommendation(tier: str, age_days: float) -> str:
if tier in ("core", "active"):
return "keep"
if tier == "archive":
return "archive" if age_days >= 60 else "monitor"
# stale — archive rather than delete (Phase 3 safety rule)
return "archive"
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import sys
workspace = sys.argv[1] if len(sys.argv) > 1 else "./memory_workspace"
scorer = MemoryRelevanceScorer(workspace)
scorer.print_summary()
path = scorer.write_report()
print(f"Full report: {path}")