Files
ajarbot/memory_system.py
Jordan Ramos 8afff96bb5 Add API usage tracking and dynamic task reloading
Features:
- Usage tracking system (usage_tracker.py)
  - Tracks input/output tokens per API call
  - Calculates costs with support for cache pricing
  - Stores data in usage_data.json (gitignored)
  - Integrated into llm_interface.py

- Dynamic task scheduler reloading
  - Auto-detects YAML changes every 60s
  - No restart needed for new tasks
  - reload_tasks() method for manual refresh

- Example cost tracking scheduled task
  - Daily API usage report
  - Budget tracking ($5/month target)
  - Disabled by default in scheduled_tasks.yaml

Improvements:
- Fixed tool_use/tool_result pair splitting bug (CRITICAL)
- Added thread safety to agent.chat()
- Fixed N+1 query problem in hybrid search
- Optimized database batch queries
- Added conversation history pruning (50 messages max)

Updated .gitignore:
- Exclude user profiles (memory_workspace/users/*.md)
- Exclude usage data (usage_data.json)
- Exclude vector index (vectors.usearch)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-13 23:38:44 -07:00

913 lines
29 KiB
Python

"""
Simple Memory System - SQLite + Markdown.
Inspired by OpenClaw's memory implementation but simplified.
"""
import hashlib
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from fastembed import TextEmbedding
from usearch.index import Index
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
# Default chunk size for splitting markdown into indexable segments
DEFAULT_CHUNK_SIZE = 500
# Hash prefix length for content fingerprinting
HASH_PREFIX_LENGTH = 16
# Default SOUL.md template for new workspaces
_SOUL_TEMPLATE = """\
# SOUL - Agent Personality
## Core Identity
- I am a helpful, knowledgeable assistant
- I value clarity, accuracy, and user experience
## Communication Style
- Be concise but thorough
- Use examples when helpful
- Ask clarifying questions when needed
## Preferences
- Prefer simple, maintainable solutions
- Document important decisions
- Learn from interactions
## Memory Usage
- Store important facts in MEMORY.md
- Track daily activities in memory/YYYY-MM-DD.md
- Remember user preferences in users/[username].md
"""
# Default user profile template
_USER_TEMPLATE = """\
# User: default
## Preferences
- Communication style: professional
- Detail level: moderate
- Timezone: UTC
## Context
- Projects: []
- Interests: []
- Goals: []
## Notes
(Add user-specific notes here)
"""
class MemorySystem:
"""Simple memory system using SQLite for indexing and Markdown for storage."""
def __init__(self, workspace_dir: str = "./memory_workspace") -> None:
self.workspace_dir = Path(workspace_dir)
self.workspace_dir.mkdir(exist_ok=True)
self.memory_dir = self.workspace_dir / "memory"
self.memory_dir.mkdir(exist_ok=True)
self.users_dir = self.workspace_dir / "users"
self.users_dir.mkdir(exist_ok=True)
self.db_path = self.workspace_dir / "memory_index.db"
# Allow cross-thread usage for async runtime compatibility
self.db = sqlite3.connect(str(self.db_path), check_same_thread=False)
self.db.row_factory = sqlite3.Row
self._init_schema()
self._init_special_files()
# Initialize embedding model (384-dim, local, $0 cost)
print("Loading FastEmbed model...")
self.embedding_model = TextEmbedding(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Initialize vector index
self.vector_index_path = self.workspace_dir / "vectors.usearch"
self.vector_index = Index(
ndim=384, # all-MiniLM-L6-v2 dimensionality
metric="cos", # cosine similarity
)
# Load existing index if present
if self.vector_index_path.exists():
self.vector_index.load(str(self.vector_index_path))
print(f"Loaded {len(self.vector_index)} vectors from index")
else:
print("Created new vector index")
self.observer: Optional[Observer] = None
self.dirty = False
def _init_schema(self) -> None:
"""Create database tables."""
self.db.execute("""
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
hash TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
path TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL,
text TEXT NOT NULL,
updated_at INTEGER NOT NULL,
vector_id INTEGER
)
""")
self.db.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts
USING fts5(
text,
path UNINDEXED,
start_line UNINDEXED,
end_line UNINDEXED
)
""")
self.db.execute("""
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
status TEXT DEFAULT 'pending',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
metadata TEXT
)
""")
self.db.execute(
"CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)"
)
# Migration: Add vector_id column if it doesn't exist
try:
self.db.execute("ALTER TABLE chunks ADD COLUMN vector_id INTEGER")
print("Added vector_id column to chunks table")
except sqlite3.OperationalError:
# Column already exists
pass
self.db.commit()
def _init_special_files(self) -> None:
"""Initialize SOUL.md and default user if they don't exist."""
soul_file = self.workspace_dir / "SOUL.md"
if not soul_file.exists():
soul_file.write_text(_SOUL_TEMPLATE, encoding="utf-8")
print("Created SOUL.md")
default_user = self.users_dir / "default.md"
if not default_user.exists():
default_user.write_text(_USER_TEMPLATE, encoding="utf-8")
print("Created users/default.md")
@staticmethod
def _hash_text(text: str) -> str:
"""Create a truncated SHA-256 hash of text content."""
return hashlib.sha256(text.encode()).hexdigest()[:HASH_PREFIX_LENGTH]
@staticmethod
def _chunk_markdown(
content: str, chunk_size: int = DEFAULT_CHUNK_SIZE
) -> List[Dict]:
"""Split markdown into chunks by paragraphs."""
lines = content.split("\n")
chunks: List[Dict] = []
current_chunk: List[str] = []
current_start = 1
for i, line in enumerate(lines, 1):
current_chunk.append(line)
is_break = not line.strip()
is_too_large = len("\n".join(current_chunk)) >= chunk_size
if is_break or is_too_large:
text = "\n".join(current_chunk).strip()
if text:
chunks.append({
"text": text,
"start_line": current_start,
"end_line": i,
})
current_chunk = []
current_start = i + 1
# Add remaining chunk
if current_chunk:
text = "\n".join(current_chunk).strip()
if text:
chunks.append({
"text": text,
"start_line": current_start,
"end_line": len(lines),
})
return chunks
def index_file(self, file_path: Path) -> None:
"""Index a markdown file."""
if not file_path.exists() or file_path.suffix != ".md":
return
stat = file_path.stat()
rel_path = str(file_path.relative_to(self.workspace_dir))
content = file_path.read_text(encoding="utf-8")
file_hash = self._hash_text(content)
# Check if file needs reindexing
existing = self.db.execute(
"SELECT hash FROM files WHERE path = ?", (rel_path,)
).fetchone()
if existing and existing["hash"] == file_hash:
return # File unchanged
# Remove old chunks and their vectors
old_chunks = self.db.execute(
"SELECT vector_id FROM chunks WHERE path = ?", (rel_path,)
).fetchall()
# Remove vectors from index
for row in old_chunks:
if row["vector_id"] is not None:
try:
self.vector_index.remove(row["vector_id"])
except (KeyError, IndexError):
pass # Vector might not exist in index, safe to ignore
# Remove from database
self.db.execute(
"DELETE FROM chunks WHERE path = ?", (rel_path,)
)
self.db.execute(
"DELETE FROM chunks_fts WHERE path = ?", (rel_path,)
)
# Create new chunks
chunks = self._chunk_markdown(content)
now = int(time.time() * 1000)
for chunk in chunks:
chunk_id = self._hash_text(
f"{rel_path}:{chunk['start_line']}:"
f"{chunk['end_line']}:{chunk['text']}"
)
# Generate embedding and store in vector index
embedding = self._generate_embedding(chunk["text"])
# Use hash of chunk_id as unique integer key for usearch
vector_id = int(hashlib.sha256(chunk_id.encode()).hexdigest()[:15], 16)
self.vector_index.add(vector_id, embedding)
self.db.execute(
"""
INSERT OR REPLACE INTO chunks
(id, path, start_line, end_line, text, updated_at, vector_id)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
chunk_id,
rel_path,
chunk["start_line"],
chunk["end_line"],
chunk["text"],
now,
vector_id,
),
)
self.db.execute(
"""
INSERT INTO chunks_fts (text, path, start_line, end_line)
VALUES (?, ?, ?, ?)
""",
(
chunk["text"],
rel_path,
chunk["start_line"],
chunk["end_line"],
),
)
# Update file record
self.db.execute(
"""
INSERT OR REPLACE INTO files (path, hash, mtime, size)
VALUES (?, ?, ?, ?)
""",
(rel_path, file_hash, int(stat.st_mtime * 1000), stat.st_size),
)
self.db.commit()
# Save vector index to disk
self.vector_index.save(str(self.vector_index_path))
print(f"Indexed {rel_path} ({len(chunks)} chunks)")
def sync(self) -> None:
"""Sync all markdown files in workspace."""
print("\nSyncing memory files...")
soul_file = self.workspace_dir / "SOUL.md"
if soul_file.exists():
self.index_file(soul_file)
memory_file = self.workspace_dir / "MEMORY.md"
if memory_file.exists():
self.index_file(memory_file)
for user_file in self.users_dir.glob("*.md"):
self.index_file(user_file)
for md_file in self.memory_dir.glob("*.md"):
self.index_file(md_file)
self.dirty = False
print("Sync complete!\n")
@staticmethod
def _sanitize_fts5_query(query: str) -> str:
"""Sanitize query string for FTS5 MATCH to prevent injection."""
# Remove or escape FTS5 special characters
# Wrap in quotes to treat as phrase search
sanitized = query.replace('"', '""') # Escape double quotes
return f'"{sanitized}"'
def _generate_embedding(self, text: str) -> np.ndarray:
"""Generate 384-dim embedding using FastEmbed (local, $0 cost)."""
# FastEmbed returns a generator, get first (and only) result
embeddings = list(self.embedding_model.embed([text]))
return embeddings[0]
def search(self, query: str, max_results: int = 5) -> List[Dict]:
"""Search memory using full-text search."""
# Sanitize query to prevent FTS5 injection
safe_query = self._sanitize_fts5_query(query)
results = self.db.execute(
"""
SELECT
chunks.path,
chunks.start_line,
chunks.end_line,
snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet,
bm25(chunks_fts) as score
FROM chunks_fts
JOIN chunks ON chunks.path = chunks_fts.path
AND chunks.start_line = chunks_fts.start_line
WHERE chunks_fts MATCH ?
ORDER BY score
LIMIT ?
""",
(safe_query, max_results),
).fetchall()
return [dict(row) for row in results]
def search_hybrid(self, query: str, max_results: int = 5) -> List[Dict]:
"""
Hybrid search combining semantic (vector) and keyword (BM25) search.
Uses 0.7 vector similarity + 0.3 BM25 scoring for optimal retrieval.
"""
if len(self.vector_index) == 0:
# No vectors yet, fall back to keyword search
return self.search(query, max_results)
# 1. Generate query embedding for semantic search
query_embedding = self._generate_embedding(query)
# 2. Get top vector matches (retrieve more for re-ranking)
vector_matches = self.vector_index.search(
query_embedding, max_results * 3
)
# 3. Get BM25 keyword matches
safe_query = self._sanitize_fts5_query(query)
bm25_results = self.db.execute(
"""
SELECT
chunks.id,
chunks.path,
chunks.start_line,
chunks.end_line,
chunks.vector_id,
snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet,
bm25(chunks_fts) as bm25_score
FROM chunks_fts
JOIN chunks ON chunks.path = chunks_fts.path
AND chunks.start_line = chunks_fts.start_line
WHERE chunks_fts MATCH ?
LIMIT ?
""",
(safe_query, max_results * 3),
).fetchall()
# 4. Normalize scores and combine
# Build maps for efficient lookup
vector_scores = {}
for match in vector_matches:
# usearch returns (key, distance) tuples
vector_id = int(match.key)
# Convert distance to similarity (cosine distance -> similarity)
similarity = 1 - match.distance
vector_scores[vector_id] = similarity
bm25_map = {}
for row in bm25_results:
bm25_map[row["id"]] = dict(row)
# Normalize BM25 scores (they're negative, lower is better)
if bm25_results:
bm25_values = [row["bm25_score"] for row in bm25_results]
min_bm25 = min(bm25_values)
max_bm25 = max(bm25_values)
bm25_range = max_bm25 - min_bm25 if max_bm25 != min_bm25 else 1
for chunk_id, chunk_data in bm25_map.items():
# Normalize to 0-1, then invert (lower BM25 is better)
normalized = (chunk_data["bm25_score"] - min_bm25) / bm25_range
bm25_map[chunk_id]["normalized_bm25"] = 1 - normalized
else:
# No BM25 results
pass
# 5. Combine scores: 0.7 vector + 0.3 BM25
combined_scores = {}
# Batch-fetch all chunks matching vector results in a single query
# instead of N separate queries (fixes N+1 query problem)
vector_id_list = [int(match.key) for match in vector_matches]
vector_chunk_map = {} # vector_id -> chunk data
if vector_id_list:
placeholders = ",".join("?" * len(vector_id_list))
vector_chunks = self.db.execute(
f"SELECT * FROM chunks WHERE vector_id IN ({placeholders})",
vector_id_list,
).fetchall()
for row in vector_chunks:
vector_chunk_map[row["vector_id"]] = dict(row)
# Collect all unique chunk IDs from both sources
all_chunk_ids = set()
for vid, chunk_data in vector_chunk_map.items():
all_chunk_ids.add(chunk_data["id"])
all_chunk_ids.update(bm25_map.keys())
# Batch-fetch any chunk data we don't already have
chunks_we_have = {cd["id"] for cd in vector_chunk_map.values()}
chunks_we_have.update(bm25_map.keys())
missing_ids = all_chunk_ids - chunks_we_have
all_chunk_data = {}
# Index data we already have from vector query
for chunk_data in vector_chunk_map.values():
all_chunk_data[chunk_data["id"]] = chunk_data
# Index data from BM25 results
for chunk_id, bm25_data in bm25_map.items():
if chunk_id not in all_chunk_data:
all_chunk_data[chunk_id] = bm25_data
# Fetch any remaining missing chunks in one query
if missing_ids:
placeholders = ",".join("?" * len(missing_ids))
missing_chunks = self.db.execute(
f"SELECT * FROM chunks WHERE id IN ({placeholders})",
list(missing_ids),
).fetchall()
for row in missing_chunks:
all_chunk_data[row["id"]] = dict(row)
# Calculate combined scores
for chunk_id in all_chunk_ids:
chunk_data = all_chunk_data.get(chunk_id)
if not chunk_data:
continue
vector_id = chunk_data.get("vector_id")
vector_score = vector_scores.get(vector_id, 0.0) if vector_id else 0.0
bm25_score = bm25_map.get(chunk_id, {}).get("normalized_bm25", 0.0)
# Weighted combination: 70% semantic, 30% keyword
combined = 0.7 * vector_score + 0.3 * bm25_score
snippet_text = chunk_data.get("text", "")
combined_scores[chunk_id] = {
"path": chunk_data["path"],
"start_line": chunk_data["start_line"],
"end_line": chunk_data["end_line"],
"snippet": bm25_map.get(chunk_id, {}).get(
"snippet",
snippet_text[:64] + "..." if len(snippet_text) > 64 else snippet_text
),
"score": combined,
}
# 6. Sort by combined score and return top results
sorted_results = sorted(
combined_scores.values(),
key=lambda x: x["score"],
reverse=True
)
return sorted_results[:max_results]
def write_memory(self, content: str, daily: bool = True) -> None:
"""Write to memory file."""
if daily:
today = datetime.now().strftime("%Y-%m-%d")
file_path = self.memory_dir / f"{today}.md"
else:
file_path = self.workspace_dir / "MEMORY.md"
if file_path.exists():
existing = file_path.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
file_path.write_text(content, encoding="utf-8")
self.index_file(file_path)
print(f"Written to {file_path.name}")
def update_soul(self, content: str, append: bool = False) -> None:
"""Update SOUL.md (agent personality)."""
soul_file = self.workspace_dir / "SOUL.md"
if append and soul_file.exists():
existing = soul_file.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
soul_file.write_text(content, encoding="utf-8")
self.index_file(soul_file)
print("Updated SOUL.md")
def update_user(
self, username: str, content: str, append: bool = False
) -> None:
"""Update user-specific memory."""
# Validate username to prevent path traversal
if not username or not username.replace("-", "").replace("_", "").isalnum():
raise ValueError(
"Invalid username: must contain only alphanumeric, "
"hyphens, and underscores"
)
user_file = self.users_dir / f"{username}.md"
# Verify the resolved path is within users_dir
try:
resolved = user_file.resolve()
if not resolved.is_relative_to(self.users_dir.resolve()):
raise ValueError("Path traversal detected in username")
except (ValueError, OSError) as e:
raise ValueError(f"Invalid username path: {e}")
if append and user_file.exists():
existing = user_file.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
elif not user_file.exists():
content = f"# User: {username}\n\n{content}"
user_file.write_text(content, encoding="utf-8")
self.index_file(user_file)
print(f"Updated users/{username}.md")
def get_soul(self) -> str:
"""Get SOUL.md content."""
soul_file = self.workspace_dir / "SOUL.md"
if soul_file.exists():
return soul_file.read_text(encoding="utf-8")
return ""
def get_user(self, username: str) -> str:
"""Get user-specific content."""
# Validate username to prevent path traversal
if not username or not username.replace("-", "").replace("_", "").isalnum():
raise ValueError(
"Invalid username: must contain only alphanumeric, "
"hyphens, and underscores"
)
user_file = self.users_dir / f"{username}.md"
# Verify the resolved path is within users_dir
try:
resolved = user_file.resolve()
if not resolved.is_relative_to(self.users_dir.resolve()):
raise ValueError("Path traversal detected in username")
except (ValueError, OSError) as e:
raise ValueError(f"Invalid username path: {e}")
if user_file.exists():
return user_file.read_text(encoding="utf-8")
return ""
def list_users(self) -> List[str]:
"""List all users with memory files."""
return [f.stem for f in self.users_dir.glob("*.md")]
def search_user(
self, username: str, query: str, max_results: int = 5
) -> List[Dict]:
"""Search within a specific user's memory."""
# Validate username to prevent path traversal
if not username or not username.replace("-", "").replace("_", "").isalnum():
raise ValueError(
"Invalid username: must contain only alphanumeric, "
"hyphens, and underscores"
)
user_path = f"users/{username}.md"
# Sanitize query to prevent FTS5 injection
safe_query = self._sanitize_fts5_query(query)
results = self.db.execute(
"""
SELECT
chunks.path,
chunks.start_line,
chunks.end_line,
snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet,
bm25(chunks_fts) as score
FROM chunks_fts
JOIN chunks ON chunks.path = chunks_fts.path
AND chunks.start_line = chunks_fts.start_line
WHERE chunks_fts MATCH ? AND chunks.path = ?
ORDER BY score
LIMIT ?
""",
(safe_query, user_path, max_results),
).fetchall()
return [dict(row) for row in results]
def read_file(
self,
rel_path: str,
from_line: Optional[int] = None,
num_lines: Optional[int] = None,
) -> str:
"""Read content from a memory file."""
file_path = self.workspace_dir / rel_path
# Verify the resolved path is within workspace_dir
try:
resolved = file_path.resolve()
if not resolved.is_relative_to(self.workspace_dir.resolve()):
raise ValueError("Path traversal detected")
except (ValueError, OSError) as e:
raise ValueError(f"Invalid file path: {e}")
if not file_path.exists():
raise FileNotFoundError(f"File not found")
content = file_path.read_text(encoding="utf-8")
if from_line is not None:
lines = content.split("\n")
start = max(0, from_line - 1)
end = start + num_lines if num_lines else len(lines)
return "\n".join(lines[start:end])
return content
def status(self) -> Dict:
"""Get memory system status."""
files = self.db.execute(
"SELECT COUNT(*) as count FROM files"
).fetchone()
chunks = self.db.execute(
"SELECT COUNT(*) as count FROM chunks"
).fetchone()
return {
"workspace": str(self.workspace_dir),
"database": str(self.db_path),
"files": files["count"],
"chunks": chunks["count"],
"dirty": self.dirty,
}
def start_watching(self) -> None:
"""Start file watcher for auto-sync."""
class _MemoryFileHandler(FileSystemEventHandler):
def __init__(self, memory_system: "MemorySystem") -> None:
self.memory_system = memory_system
def on_modified(self, event) -> None:
if event.src_path.endswith(".md"):
self.memory_system.dirty = True
print(
f"Detected change: {Path(event.src_path).name}"
)
self.observer = Observer()
handler = _MemoryFileHandler(self)
self.observer.schedule(
handler, str(self.workspace_dir), recursive=True
)
self.observer.start()
print(f"Watching {self.workspace_dir} for changes...")
def stop_watching(self) -> None:
"""Stop file watcher."""
if self.observer:
self.observer.stop()
self.observer.join()
def add_task(
self,
title: str,
description: str = "",
metadata: Optional[Dict] = None,
) -> int:
"""Add task for tracking."""
now = int(time.time() * 1000)
cursor = self.db.execute(
"""
INSERT INTO tasks
(title, description, status, created_at, updated_at, metadata)
VALUES (?, ?, 'pending', ?, ?, ?)
""",
(title, description, now, now, str(metadata or {})),
)
self.db.commit()
return cursor.lastrowid
def update_task(
self,
task_id: int,
status: Optional[str] = None,
description: Optional[str] = None,
) -> None:
"""Update task status or description."""
now = int(time.time() * 1000)
updates = ["updated_at = ?"]
params: list = [now]
if status:
updates.append("status = ?")
params.append(status)
if description:
updates.append("description = ?")
params.append(description)
params.append(task_id)
self.db.execute(
f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?",
params,
)
self.db.commit()
def get_tasks(self, status: Optional[str] = None) -> List[Dict]:
"""Get tasks, optionally filtered by status."""
if status:
rows = self.db.execute(
"SELECT * FROM tasks WHERE status = ? "
"ORDER BY created_at DESC",
(status,),
).fetchall()
else:
rows = self.db.execute(
"SELECT * FROM tasks ORDER BY created_at DESC"
).fetchall()
return [dict(row) for row in rows]
def close(self) -> None:
"""Close database and cleanup."""
self.stop_watching()
# Save vector index before closing
if len(self.vector_index) > 0:
self.vector_index.save(str(self.vector_index_path))
self.db.close()
if __name__ == "__main__":
memory = MemorySystem()
memory.sync()
memory.update_soul(
"""
## Learning Style
- I learn from each interaction
- I adapt to user preferences
- I maintain consistency in my personality
""",
append=True,
)
memory.update_user(
"alice",
"""
## Preferences
- Likes detailed technical explanations
- Working on Python projects
- Prefers morning work sessions
## Current Projects
- Building a memory system
- Learning SQLite FTS5
""",
)
memory.update_user(
"bob",
"""
## Preferences
- Prefers concise answers
- JavaScript developer
- Works late nights
## Current Focus
- React application
- API integration
""",
)
memory.write_memory(
"""
# Project Setup Notes
- Using SQLite for fast indexing
- Markdown files are the source of truth
- Daily logs in memory/YYYY-MM-DD.md
- Long-term notes in MEMORY.md
- SOUL.md defines agent personality
- users/*.md for user-specific context
""",
daily=False,
)
memory.write_memory(
"""
## Today's Progress
- Implemented basic memory system
- Added full-text search with FTS5
- Added SOUL.md and user files
- File watching works great
""",
daily=True,
)
print("\nSearching for 'sqlite':")
results = memory.search("sqlite")
for result in results:
print(
f"\n{result['path']}:{result['start_line']}-"
f"{result['end_line']}"
)
print(f" {result['snippet']}")
print(f" (score: {result['score']:.2f})")
print("\n\nSearching Alice's memory for 'python':")
alice_results = memory.search_user("alice", "python")
for result in alice_results:
print(
f"\n{result['path']}:{result['start_line']}-"
f"{result['end_line']}"
)
print(f" {result['snippet']}")
print("\n\nSOUL Content Preview:")
soul = memory.get_soul()
print(soul[:200] + "...")
print(f"\n\nUsers with memory: {', '.join(memory.list_users())}")
print("\nMemory Status:")
status = memory.status()
for key, value in status.items():
print(f" {key}: {value}")
memory.close()