""" Simple Memory System - SQLite + Markdown. Inspired by OpenClaw's memory implementation but simplified. """ import hashlib import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer # Default chunk size for splitting markdown into indexable segments DEFAULT_CHUNK_SIZE = 500 # Hash prefix length for content fingerprinting HASH_PREFIX_LENGTH = 16 # Default SOUL.md template for new workspaces _SOUL_TEMPLATE = """\ # SOUL - Agent Personality ## Core Identity - I am a helpful, knowledgeable assistant - I value clarity, accuracy, and user experience ## Communication Style - Be concise but thorough - Use examples when helpful - Ask clarifying questions when needed ## Preferences - Prefer simple, maintainable solutions - Document important decisions - Learn from interactions ## Memory Usage - Store important facts in MEMORY.md - Track daily activities in memory/YYYY-MM-DD.md - Remember user preferences in users/[username].md """ # Default user profile template _USER_TEMPLATE = """\ # User: default ## Preferences - Communication style: professional - Detail level: moderate - Timezone: UTC ## Context - Projects: [] - Interests: [] - Goals: [] ## Notes (Add user-specific notes here) """ class MemorySystem: """Simple memory system using SQLite for indexing and Markdown for storage.""" def __init__(self, workspace_dir: str = "./memory_workspace") -> None: self.workspace_dir = Path(workspace_dir) self.workspace_dir.mkdir(exist_ok=True) self.memory_dir = self.workspace_dir / "memory" self.memory_dir.mkdir(exist_ok=True) self.users_dir = self.workspace_dir / "users" self.users_dir.mkdir(exist_ok=True) self.db_path = self.workspace_dir / "memory_index.db" # Allow cross-thread usage for async runtime compatibility self.db = sqlite3.connect(str(self.db_path), check_same_thread=False) self.db.row_factory = sqlite3.Row self._init_schema() self._init_special_files() self.observer: Optional[Observer] = None self.dirty = False def _init_schema(self) -> None: """Create database tables.""" self.db.execute(""" CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, hash TEXT NOT NULL, mtime INTEGER NOT NULL, size INTEGER NOT NULL ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, path TEXT NOT NULL, start_line INTEGER NOT NULL, end_line INTEGER NOT NULL, text TEXT NOT NULL, updated_at INTEGER NOT NULL ) """) self.db.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5( text, path UNINDEXED, start_line UNINDEXED, end_line UNINDEXED ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'pending', created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, metadata TEXT ) """) self.db.execute( "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)" ) self.db.commit() def _init_special_files(self) -> None: """Initialize SOUL.md and default user if they don't exist.""" soul_file = self.workspace_dir / "SOUL.md" if not soul_file.exists(): soul_file.write_text(_SOUL_TEMPLATE, encoding="utf-8") print("Created SOUL.md") default_user = self.users_dir / "default.md" if not default_user.exists(): default_user.write_text(_USER_TEMPLATE, encoding="utf-8") print("Created users/default.md") @staticmethod def _hash_text(text: str) -> str: """Create a truncated SHA-256 hash of text content.""" return hashlib.sha256(text.encode()).hexdigest()[:HASH_PREFIX_LENGTH] @staticmethod def _chunk_markdown( content: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> List[Dict]: """Split markdown into chunks by paragraphs.""" lines = content.split("\n") chunks: List[Dict] = [] current_chunk: List[str] = [] current_start = 1 for i, line in enumerate(lines, 1): current_chunk.append(line) is_break = not line.strip() is_too_large = len("\n".join(current_chunk)) >= chunk_size if is_break or is_too_large: text = "\n".join(current_chunk).strip() if text: chunks.append({ "text": text, "start_line": current_start, "end_line": i, }) current_chunk = [] current_start = i + 1 # Add remaining chunk if current_chunk: text = "\n".join(current_chunk).strip() if text: chunks.append({ "text": text, "start_line": current_start, "end_line": len(lines), }) return chunks def index_file(self, file_path: Path) -> None: """Index a markdown file.""" if not file_path.exists() or file_path.suffix != ".md": return stat = file_path.stat() rel_path = str(file_path.relative_to(self.workspace_dir)) content = file_path.read_text(encoding="utf-8") file_hash = self._hash_text(content) # Check if file needs reindexing existing = self.db.execute( "SELECT hash FROM files WHERE path = ?", (rel_path,) ).fetchone() if existing and existing["hash"] == file_hash: return # File unchanged # Remove old chunks self.db.execute( "DELETE FROM chunks WHERE path = ?", (rel_path,) ) self.db.execute( "DELETE FROM chunks_fts WHERE path = ?", (rel_path,) ) # Create new chunks chunks = self._chunk_markdown(content) now = int(time.time() * 1000) for chunk in chunks: chunk_id = self._hash_text( f"{rel_path}:{chunk['start_line']}:" f"{chunk['end_line']}:{chunk['text']}" ) self.db.execute( """ INSERT OR REPLACE INTO chunks (id, path, start_line, end_line, text, updated_at) VALUES (?, ?, ?, ?, ?, ?) """, ( chunk_id, rel_path, chunk["start_line"], chunk["end_line"], chunk["text"], now, ), ) self.db.execute( """ INSERT INTO chunks_fts (text, path, start_line, end_line) VALUES (?, ?, ?, ?) """, ( chunk["text"], rel_path, chunk["start_line"], chunk["end_line"], ), ) # Update file record self.db.execute( """ INSERT OR REPLACE INTO files (path, hash, mtime, size) VALUES (?, ?, ?, ?) """, (rel_path, file_hash, int(stat.st_mtime * 1000), stat.st_size), ) self.db.commit() print(f"Indexed {rel_path} ({len(chunks)} chunks)") def sync(self) -> None: """Sync all markdown files in workspace.""" print("\nSyncing memory files...") soul_file = self.workspace_dir / "SOUL.md" if soul_file.exists(): self.index_file(soul_file) memory_file = self.workspace_dir / "MEMORY.md" if memory_file.exists(): self.index_file(memory_file) for user_file in self.users_dir.glob("*.md"): self.index_file(user_file) for md_file in self.memory_dir.glob("*.md"): self.index_file(md_file) self.dirty = False print("Sync complete!\n") @staticmethod def _sanitize_fts5_query(query: str) -> str: """Sanitize query string for FTS5 MATCH to prevent injection.""" # Remove or escape FTS5 special characters # Wrap in quotes to treat as phrase search sanitized = query.replace('"', '""') # Escape double quotes return f'"{sanitized}"' def search(self, query: str, max_results: int = 5) -> List[Dict]: """Search memory using full-text search.""" # Sanitize query to prevent FTS5 injection safe_query = self._sanitize_fts5_query(query) results = self.db.execute( """ SELECT chunks.path, chunks.start_line, chunks.end_line, snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet, bm25(chunks_fts) as score FROM chunks_fts JOIN chunks ON chunks.path = chunks_fts.path AND chunks.start_line = chunks_fts.start_line WHERE chunks_fts MATCH ? ORDER BY score LIMIT ? """, (safe_query, max_results), ).fetchall() return [dict(row) for row in results] def write_memory(self, content: str, daily: bool = True) -> None: """Write to memory file.""" if daily: today = datetime.now().strftime("%Y-%m-%d") file_path = self.memory_dir / f"{today}.md" else: file_path = self.workspace_dir / "MEMORY.md" if file_path.exists(): existing = file_path.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" file_path.write_text(content, encoding="utf-8") self.index_file(file_path) print(f"Written to {file_path.name}") def update_soul(self, content: str, append: bool = False) -> None: """Update SOUL.md (agent personality).""" soul_file = self.workspace_dir / "SOUL.md" if append and soul_file.exists(): existing = soul_file.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" soul_file.write_text(content, encoding="utf-8") self.index_file(soul_file) print("Updated SOUL.md") def update_user( self, username: str, content: str, append: bool = False ) -> None: """Update user-specific memory.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_file = self.users_dir / f"{username}.md" # Verify the resolved path is within users_dir try: resolved = user_file.resolve() if not resolved.is_relative_to(self.users_dir.resolve()): raise ValueError("Path traversal detected in username") except (ValueError, OSError) as e: raise ValueError(f"Invalid username path: {e}") if append and user_file.exists(): existing = user_file.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" elif not user_file.exists(): content = f"# User: {username}\n\n{content}" user_file.write_text(content, encoding="utf-8") self.index_file(user_file) print(f"Updated users/{username}.md") def get_soul(self) -> str: """Get SOUL.md content.""" soul_file = self.workspace_dir / "SOUL.md" if soul_file.exists(): return soul_file.read_text(encoding="utf-8") return "" def get_user(self, username: str) -> str: """Get user-specific content.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_file = self.users_dir / f"{username}.md" # Verify the resolved path is within users_dir try: resolved = user_file.resolve() if not resolved.is_relative_to(self.users_dir.resolve()): raise ValueError("Path traversal detected in username") except (ValueError, OSError) as e: raise ValueError(f"Invalid username path: {e}") if user_file.exists(): return user_file.read_text(encoding="utf-8") return "" def list_users(self) -> List[str]: """List all users with memory files.""" return [f.stem for f in self.users_dir.glob("*.md")] def search_user( self, username: str, query: str, max_results: int = 5 ) -> List[Dict]: """Search within a specific user's memory.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_path = f"users/{username}.md" # Sanitize query to prevent FTS5 injection safe_query = self._sanitize_fts5_query(query) results = self.db.execute( """ SELECT chunks.path, chunks.start_line, chunks.end_line, snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet, bm25(chunks_fts) as score FROM chunks_fts JOIN chunks ON chunks.path = chunks_fts.path AND chunks.start_line = chunks_fts.start_line WHERE chunks_fts MATCH ? AND chunks.path = ? ORDER BY score LIMIT ? """, (safe_query, user_path, max_results), ).fetchall() return [dict(row) for row in results] def read_file( self, rel_path: str, from_line: Optional[int] = None, num_lines: Optional[int] = None, ) -> str: """Read content from a memory file.""" file_path = self.workspace_dir / rel_path # Verify the resolved path is within workspace_dir try: resolved = file_path.resolve() if not resolved.is_relative_to(self.workspace_dir.resolve()): raise ValueError("Path traversal detected") except (ValueError, OSError) as e: raise ValueError(f"Invalid file path: {e}") if not file_path.exists(): raise FileNotFoundError(f"File not found") content = file_path.read_text(encoding="utf-8") if from_line is not None: lines = content.split("\n") start = max(0, from_line - 1) end = start + num_lines if num_lines else len(lines) return "\n".join(lines[start:end]) return content def status(self) -> Dict: """Get memory system status.""" files = self.db.execute( "SELECT COUNT(*) as count FROM files" ).fetchone() chunks = self.db.execute( "SELECT COUNT(*) as count FROM chunks" ).fetchone() return { "workspace": str(self.workspace_dir), "database": str(self.db_path), "files": files["count"], "chunks": chunks["count"], "dirty": self.dirty, } def start_watching(self) -> None: """Start file watcher for auto-sync.""" class _MemoryFileHandler(FileSystemEventHandler): def __init__(self, memory_system: "MemorySystem") -> None: self.memory_system = memory_system def on_modified(self, event) -> None: if event.src_path.endswith(".md"): self.memory_system.dirty = True print( f"Detected change: {Path(event.src_path).name}" ) self.observer = Observer() handler = _MemoryFileHandler(self) self.observer.schedule( handler, str(self.workspace_dir), recursive=True ) self.observer.start() print(f"Watching {self.workspace_dir} for changes...") def stop_watching(self) -> None: """Stop file watcher.""" if self.observer: self.observer.stop() self.observer.join() def add_task( self, title: str, description: str = "", metadata: Optional[Dict] = None, ) -> int: """Add task for tracking.""" now = int(time.time() * 1000) cursor = self.db.execute( """ INSERT INTO tasks (title, description, status, created_at, updated_at, metadata) VALUES (?, ?, 'pending', ?, ?, ?) """, (title, description, now, now, str(metadata or {})), ) self.db.commit() return cursor.lastrowid def update_task( self, task_id: int, status: Optional[str] = None, description: Optional[str] = None, ) -> None: """Update task status or description.""" now = int(time.time() * 1000) updates = ["updated_at = ?"] params: list = [now] if status: updates.append("status = ?") params.append(status) if description: updates.append("description = ?") params.append(description) params.append(task_id) self.db.execute( f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", params, ) self.db.commit() def get_tasks(self, status: Optional[str] = None) -> List[Dict]: """Get tasks, optionally filtered by status.""" if status: rows = self.db.execute( "SELECT * FROM tasks WHERE status = ? " "ORDER BY created_at DESC", (status,), ).fetchall() else: rows = self.db.execute( "SELECT * FROM tasks ORDER BY created_at DESC" ).fetchall() return [dict(row) for row in rows] def close(self) -> None: """Close database and cleanup.""" self.stop_watching() self.db.close() if __name__ == "__main__": memory = MemorySystem() memory.sync() memory.update_soul( """ ## Learning Style - I learn from each interaction - I adapt to user preferences - I maintain consistency in my personality """, append=True, ) memory.update_user( "alice", """ ## Preferences - Likes detailed technical explanations - Working on Python projects - Prefers morning work sessions ## Current Projects - Building a memory system - Learning SQLite FTS5 """, ) memory.update_user( "bob", """ ## Preferences - Prefers concise answers - JavaScript developer - Works late nights ## Current Focus - React application - API integration """, ) memory.write_memory( """ # Project Setup Notes - Using SQLite for fast indexing - Markdown files are the source of truth - Daily logs in memory/YYYY-MM-DD.md - Long-term notes in MEMORY.md - SOUL.md defines agent personality - users/*.md for user-specific context """, daily=False, ) memory.write_memory( """ ## Today's Progress - Implemented basic memory system - Added full-text search with FTS5 - Added SOUL.md and user files - File watching works great """, daily=True, ) print("\nSearching for 'sqlite':") results = memory.search("sqlite") for result in results: print( f"\n{result['path']}:{result['start_line']}-" f"{result['end_line']}" ) print(f" {result['snippet']}") print(f" (score: {result['score']:.2f})") print("\n\nSearching Alice's memory for 'python':") alice_results = memory.search_user("alice", "python") for result in alice_results: print( f"\n{result['path']}:{result['start_line']}-" f"{result['end_line']}" ) print(f" {result['snippet']}") print("\n\nSOUL Content Preview:") soul = memory.get_soul() print(soul[:200] + "...") print(f"\n\nUsers with memory: {', '.join(memory.list_users())}") print("\nMemory Status:") status = memory.status() for key, value in status.items(): print(f" {key}: {value}") memory.close()