Core agent improvements: - RSO (Relevance Scoring & Observation) system: interaction_logger, memory_scorer, signal_detector - Memory access logging (memory_access_log table) for relevance scoring; high-signal turn detection - Rich conversation storage for notable turns; compact_conversation truncates long user messages - Task-type classifier (query/action/analysis/creative) for observation tagging - Nested sub-agent visibility: deep delegations now register against the main agent's manager Child safety (Gabriel profile): - child_safety.py: filtering, audit logging, prompt constants for restricted sessions - .kiro/specs/child-safety-profile: requirements, design, tasks specs - GABRIEL_BOT_PROPOSAL.md: initial proposal doc - Reduced context window (10 msgs) and tutor-mode identity for restricted users Telegram adapter: - Polling watchdog: auto-restarts updater if polling drops unexpectedly - get_me() with exponential-backoff retry on NetworkError at startup - Correct stop() ordering: signal watchdog before cancelling tasks Email / Gmail: - send_email: supports file attachments (attachments list param) - get_email: surfaces attachment metadata in response Scheduled tasks / weather: - Remove OpenWeatherMap API calls from morning-weather task; use wttr.in exclusively - New scheduled tasks and scheduler state persistence Discord: - adapters/discord/__init__.py scaffold - discord-plugin: MCP plugin for Claude Code Discord integration (server.ts, skills, config) Infrastructure: - n8n workflow exports (garvis_webhook, content_pipeline variants) - memory_workspace: context, homelab-repo-updates, weekly observation summaries, error logs - UCS C240 migration plan doc - requirements.txt: new deps - .claude/settings.json, fix_hooks.py: hook/permission tuning
112 lines
4.2 KiB
Python
112 lines
4.2 KiB
Python
"""
|
|
Interaction Logger — JSONL-based observation log for RSO Phase 1.
|
|
|
|
Writes are performed on daemon background threads so logging never
|
|
blocks response delivery. All log files live under:
|
|
|
|
memory_workspace/observation/logs/YYYY-MM-DD.jsonl
|
|
memory_workspace/observation/errors/YYYY-MM-DD.jsonl
|
|
|
|
Sub-agents MUST NOT instantiate this class. Only the main Agent
|
|
(is_sub_agent=False) creates and uses an InteractionLogger.
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
from datetime import date
|
|
from datetime import datetime
|
|
from datetime import timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from typing import Dict
|
|
from typing import Optional
|
|
|
|
|
|
class InteractionLogger:
|
|
"""Thread-safe, async JSONL interaction logger."""
|
|
|
|
def __init__(self, workspace_dir: Path) -> None:
|
|
self._base = Path(workspace_dir) / "observation"
|
|
self._logs_dir = self._base / "logs"
|
|
self._errors_dir = self._base / "errors"
|
|
self._summaries_dir = self._base / "summaries"
|
|
|
|
# Create directories eagerly — they must exist before the first
|
|
# background write fires.
|
|
for d in (self._logs_dir, self._errors_dir, self._summaries_dir):
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def log_interaction(self, entry: Dict[str, Any]) -> None:
|
|
"""Append an interaction entry to today's JSONL log (non-blocking)."""
|
|
path = self._logs_dir / f"{date.today().isoformat()}.jsonl"
|
|
self._fire_and_forget(path, entry)
|
|
|
|
def log_error(self, entry: Dict[str, Any]) -> None:
|
|
"""Append a structured error entry to today's error JSONL (non-blocking)."""
|
|
path = self._errors_dir / f"{date.today().isoformat()}.jsonl"
|
|
self._fire_and_forget(path, entry)
|
|
|
|
def update_signal(
|
|
self,
|
|
interaction_id: str,
|
|
signal_dict: Dict[str, Any],
|
|
) -> None:
|
|
"""Append a signal-patch record referencing a prior interaction.
|
|
|
|
Rather than mutating the original record (which would require a
|
|
read-rewrite that is neither atomic nor safe under concurrent
|
|
access), we append a lightweight patch record. The analysis
|
|
layer merges patches when it reads the log.
|
|
"""
|
|
patch = {
|
|
"record_type": "signal_patch",
|
|
"interaction_id": interaction_id,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"signal": signal_dict,
|
|
}
|
|
self.log_interaction(patch)
|
|
|
|
def cleanup_old_logs(self, retention_days: int = 90) -> None:
|
|
"""Delete JSONL files older than retention_days.
|
|
|
|
Called synchronously at agent startup — not on the hot path.
|
|
"""
|
|
cutoff = time.time() - (retention_days * 86400)
|
|
for directory in (self._logs_dir, self._errors_dir):
|
|
for f in directory.glob("*.jsonl"):
|
|
try:
|
|
if f.stat().st_mtime < cutoff:
|
|
f.unlink()
|
|
print(f"[ObsLogger] Deleted old log: {f.name}")
|
|
except OSError as e:
|
|
print(f"[ObsLogger] Could not delete {f}: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _fire_and_forget(self, path: Path, record: Dict[str, Any]) -> None:
|
|
"""Launch a daemon thread to append one JSON line to *path*."""
|
|
t = threading.Thread(
|
|
target=self._append_jsonl,
|
|
args=(path, record),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
@staticmethod
|
|
def _append_jsonl(path: Path, record: Dict[str, Any]) -> None:
|
|
"""Append one JSON line. Called only from background threads."""
|
|
try:
|
|
line = json.dumps(record, default=str, ensure_ascii=False)
|
|
with open(path, "a", encoding="utf-8") as fh:
|
|
fh.write(line + "\n")
|
|
except Exception as e:
|
|
# Last-resort console output — never raises back to caller.
|
|
print(f"[ObsLogger] Write failed ({path.name}): {e}")
|