""" Simple Memory System - SQLite + Markdown. Inspired by OpenClaw's memory implementation but simplified. """ import hashlib import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional import numpy as np from fastembed import TextEmbedding from usearch.index import Index from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer # Default chunk size for splitting markdown into indexable segments DEFAULT_CHUNK_SIZE = 500 # Hash prefix length for content fingerprinting HASH_PREFIX_LENGTH = 16 # Default SOUL.md template for new workspaces _SOUL_TEMPLATE = """\ # SOUL - Agent Personality ## Core Identity - I am a helpful, knowledgeable assistant - I value clarity, accuracy, and user experience ## Communication Style - Be concise but thorough - Use examples when helpful - Ask clarifying questions when needed ## Preferences - Prefer simple, maintainable solutions - Document important decisions - Learn from interactions ## Memory Usage - Store important facts in MEMORY.md - Track daily activities in memory/YYYY-MM-DD.md - Remember user preferences in users/[username].md """ # Default user profile template _USER_TEMPLATE = """\ # User: default ## Preferences - Communication style: professional - Detail level: moderate - Timezone: UTC ## Context - Projects: [] - Interests: [] - Goals: [] ## Notes (Add user-specific notes here) """ class MemorySystem: """Simple memory system using SQLite for indexing and Markdown for storage.""" def __init__(self, workspace_dir: str = "./memory_workspace") -> None: self.workspace_dir = Path(workspace_dir) self.workspace_dir.mkdir(exist_ok=True) self.memory_dir = self.workspace_dir / "memory" self.memory_dir.mkdir(exist_ok=True) self.users_dir = self.workspace_dir / "users" self.users_dir.mkdir(exist_ok=True) self.db_path = self.workspace_dir / "memory_index.db" # Allow cross-thread usage for async runtime compatibility self.db = sqlite3.connect(str(self.db_path), check_same_thread=False) self.db.row_factory = sqlite3.Row self._init_schema() self._init_special_files() # Initialize embedding model (384-dim, local, $0 cost) print("Loading FastEmbed model...") self.embedding_model = TextEmbedding( model_name="sentence-transformers/all-MiniLM-L6-v2" ) # Initialize vector index self.vector_index_path = self.workspace_dir / "vectors.usearch" self.vector_index = Index( ndim=384, # all-MiniLM-L6-v2 dimensionality metric="cos", # cosine similarity ) # Load existing index if present if self.vector_index_path.exists(): self.vector_index.load(str(self.vector_index_path)) print(f"Loaded {len(self.vector_index)} vectors from index") else: print("Created new vector index") self.observer: Optional[Observer] = None self.dirty = False def _init_schema(self) -> None: """Create database tables.""" self.db.execute(""" CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, hash TEXT NOT NULL, mtime INTEGER NOT NULL, size INTEGER NOT NULL ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS chunks ( id TEXT PRIMARY KEY, path TEXT NOT NULL, start_line INTEGER NOT NULL, end_line INTEGER NOT NULL, text TEXT NOT NULL, updated_at INTEGER NOT NULL, vector_id INTEGER ) """) self.db.execute(""" CREATE VIRTUAL TABLE IF NOT EXISTS chunks_fts USING fts5( text, path UNINDEXED, start_line UNINDEXED, end_line UNINDEXED ) """) self.db.execute(""" CREATE TABLE IF NOT EXISTS tasks ( id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL, description TEXT, status TEXT DEFAULT 'pending', created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, metadata TEXT ) """) self.db.execute( "CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)" ) # Migration: Add vector_id column if it doesn't exist try: self.db.execute("ALTER TABLE chunks ADD COLUMN vector_id INTEGER") print("Added vector_id column to chunks table") except sqlite3.OperationalError: # Column already exists pass self.db.commit() def _init_special_files(self) -> None: """Initialize SOUL.md and default user if they don't exist.""" soul_file = self.workspace_dir / "SOUL.md" if not soul_file.exists(): soul_file.write_text(_SOUL_TEMPLATE, encoding="utf-8") print("Created SOUL.md") default_user = self.users_dir / "default.md" if not default_user.exists(): default_user.write_text(_USER_TEMPLATE, encoding="utf-8") print("Created users/default.md") @staticmethod def _hash_text(text: str) -> str: """Create a truncated SHA-256 hash of text content.""" return hashlib.sha256(text.encode()).hexdigest()[:HASH_PREFIX_LENGTH] @staticmethod def _chunk_markdown( content: str, chunk_size: int = DEFAULT_CHUNK_SIZE ) -> List[Dict]: """Split markdown into chunks by paragraphs.""" lines = content.split("\n") chunks: List[Dict] = [] current_chunk: List[str] = [] current_start = 1 for i, line in enumerate(lines, 1): current_chunk.append(line) is_break = not line.strip() is_too_large = len("\n".join(current_chunk)) >= chunk_size if is_break or is_too_large: text = "\n".join(current_chunk).strip() if text: chunks.append({ "text": text, "start_line": current_start, "end_line": i, }) current_chunk = [] current_start = i + 1 # Add remaining chunk if current_chunk: text = "\n".join(current_chunk).strip() if text: chunks.append({ "text": text, "start_line": current_start, "end_line": len(lines), }) return chunks def index_file(self, file_path: Path) -> None: """Index a markdown file.""" if not file_path.exists() or file_path.suffix != ".md": return stat = file_path.stat() rel_path = str(file_path.relative_to(self.workspace_dir)) content = file_path.read_text(encoding="utf-8") file_hash = self._hash_text(content) # Check if file needs reindexing existing = self.db.execute( "SELECT hash FROM files WHERE path = ?", (rel_path,) ).fetchone() if existing and existing["hash"] == file_hash: return # File unchanged # Remove old chunks and their vectors old_chunks = self.db.execute( "SELECT vector_id FROM chunks WHERE path = ?", (rel_path,) ).fetchall() # Remove vectors from index for row in old_chunks: if row["vector_id"] is not None: try: self.vector_index.remove(row["vector_id"]) except (KeyError, IndexError): pass # Vector might not exist in index, safe to ignore # Remove from database self.db.execute( "DELETE FROM chunks WHERE path = ?", (rel_path,) ) self.db.execute( "DELETE FROM chunks_fts WHERE path = ?", (rel_path,) ) # Create new chunks chunks = self._chunk_markdown(content) now = int(time.time() * 1000) for chunk in chunks: chunk_id = self._hash_text( f"{rel_path}:{chunk['start_line']}:" f"{chunk['end_line']}:{chunk['text']}" ) # Generate embedding and store in vector index embedding = self._generate_embedding(chunk["text"]) # Use hash of chunk_id as unique integer key for usearch vector_id = int(hashlib.sha256(chunk_id.encode()).hexdigest()[:15], 16) self.vector_index.add(vector_id, embedding) self.db.execute( """ INSERT OR REPLACE INTO chunks (id, path, start_line, end_line, text, updated_at, vector_id) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( chunk_id, rel_path, chunk["start_line"], chunk["end_line"], chunk["text"], now, vector_id, ), ) self.db.execute( """ INSERT INTO chunks_fts (text, path, start_line, end_line) VALUES (?, ?, ?, ?) """, ( chunk["text"], rel_path, chunk["start_line"], chunk["end_line"], ), ) # Update file record self.db.execute( """ INSERT OR REPLACE INTO files (path, hash, mtime, size) VALUES (?, ?, ?, ?) """, (rel_path, file_hash, int(stat.st_mtime * 1000), stat.st_size), ) self.db.commit() # Save vector index to disk self.vector_index.save(str(self.vector_index_path)) print(f"Indexed {rel_path} ({len(chunks)} chunks)") def sync(self) -> None: """Sync all markdown files in workspace.""" print("\nSyncing memory files...") soul_file = self.workspace_dir / "SOUL.md" if soul_file.exists(): self.index_file(soul_file) memory_file = self.workspace_dir / "MEMORY.md" if memory_file.exists(): self.index_file(memory_file) for user_file in self.users_dir.glob("*.md"): self.index_file(user_file) for md_file in self.memory_dir.glob("*.md"): self.index_file(md_file) self.dirty = False print("Sync complete!\n") @staticmethod def _sanitize_fts5_query(query: str) -> str: """Sanitize query string for FTS5 MATCH to prevent injection.""" # Remove or escape FTS5 special characters # Wrap in quotes to treat as phrase search sanitized = query.replace('"', '""') # Escape double quotes return f'"{sanitized}"' def _generate_embedding(self, text: str) -> np.ndarray: """Generate 384-dim embedding using FastEmbed (local, $0 cost).""" # FastEmbed returns a generator, get first (and only) result embeddings = list(self.embedding_model.embed([text])) return embeddings[0] def search(self, query: str, max_results: int = 5) -> List[Dict]: """Search memory using full-text search.""" # Sanitize query to prevent FTS5 injection safe_query = self._sanitize_fts5_query(query) results = self.db.execute( """ SELECT chunks.path, chunks.start_line, chunks.end_line, snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet, bm25(chunks_fts) as score FROM chunks_fts JOIN chunks ON chunks.path = chunks_fts.path AND chunks.start_line = chunks_fts.start_line WHERE chunks_fts MATCH ? ORDER BY score LIMIT ? """, (safe_query, max_results), ).fetchall() return [dict(row) for row in results] def search_hybrid(self, query: str, max_results: int = 5) -> List[Dict]: """ Hybrid search combining semantic (vector) and keyword (BM25) search. Uses 0.7 vector similarity + 0.3 BM25 scoring for optimal retrieval. """ if len(self.vector_index) == 0: # No vectors yet, fall back to keyword search return self.search(query, max_results) # 1. Generate query embedding for semantic search query_embedding = self._generate_embedding(query) # 2. Get top vector matches (retrieve more for re-ranking) vector_matches = self.vector_index.search( query_embedding, max_results * 3 ) # 3. Get BM25 keyword matches safe_query = self._sanitize_fts5_query(query) bm25_results = self.db.execute( """ SELECT chunks.id, chunks.path, chunks.start_line, chunks.end_line, chunks.vector_id, snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet, bm25(chunks_fts) as bm25_score FROM chunks_fts JOIN chunks ON chunks.path = chunks_fts.path AND chunks.start_line = chunks_fts.start_line WHERE chunks_fts MATCH ? LIMIT ? """, (safe_query, max_results * 3), ).fetchall() # 4. Normalize scores and combine # Build maps for efficient lookup vector_scores = {} for match in vector_matches: # usearch returns (key, distance) tuples vector_id = int(match.key) # Convert distance to similarity (cosine distance -> similarity) similarity = 1 - match.distance vector_scores[vector_id] = similarity bm25_map = {} for row in bm25_results: bm25_map[row["id"]] = dict(row) # Normalize BM25 scores (they're negative, lower is better) if bm25_results: bm25_values = [row["bm25_score"] for row in bm25_results] min_bm25 = min(bm25_values) max_bm25 = max(bm25_values) bm25_range = max_bm25 - min_bm25 if max_bm25 != min_bm25 else 1 for chunk_id, chunk_data in bm25_map.items(): # Normalize to 0-1, then invert (lower BM25 is better) normalized = (chunk_data["bm25_score"] - min_bm25) / bm25_range bm25_map[chunk_id]["normalized_bm25"] = 1 - normalized else: # No BM25 results pass # 5. Combine scores: 0.7 vector + 0.3 BM25 combined_scores = {} # Batch-fetch all chunks matching vector results in a single query # instead of N separate queries (fixes N+1 query problem) vector_id_list = [int(match.key) for match in vector_matches] vector_chunk_map = {} # vector_id -> chunk data if vector_id_list: placeholders = ",".join("?" * len(vector_id_list)) vector_chunks = self.db.execute( f"SELECT * FROM chunks WHERE vector_id IN ({placeholders})", vector_id_list, ).fetchall() for row in vector_chunks: vector_chunk_map[row["vector_id"]] = dict(row) # Collect all unique chunk IDs from both sources all_chunk_ids = set() for vid, chunk_data in vector_chunk_map.items(): all_chunk_ids.add(chunk_data["id"]) all_chunk_ids.update(bm25_map.keys()) # Batch-fetch any chunk data we don't already have chunks_we_have = {cd["id"] for cd in vector_chunk_map.values()} chunks_we_have.update(bm25_map.keys()) missing_ids = all_chunk_ids - chunks_we_have all_chunk_data = {} # Index data we already have from vector query for chunk_data in vector_chunk_map.values(): all_chunk_data[chunk_data["id"]] = chunk_data # Index data from BM25 results for chunk_id, bm25_data in bm25_map.items(): if chunk_id not in all_chunk_data: all_chunk_data[chunk_id] = bm25_data # Fetch any remaining missing chunks in one query if missing_ids: placeholders = ",".join("?" * len(missing_ids)) missing_chunks = self.db.execute( f"SELECT * FROM chunks WHERE id IN ({placeholders})", list(missing_ids), ).fetchall() for row in missing_chunks: all_chunk_data[row["id"]] = dict(row) # Calculate combined scores for chunk_id in all_chunk_ids: chunk_data = all_chunk_data.get(chunk_id) if not chunk_data: continue vector_id = chunk_data.get("vector_id") vector_score = vector_scores.get(vector_id, 0.0) if vector_id else 0.0 bm25_score = bm25_map.get(chunk_id, {}).get("normalized_bm25", 0.0) # Weighted combination: 70% semantic, 30% keyword combined = 0.7 * vector_score + 0.3 * bm25_score snippet_text = chunk_data.get("text", "") combined_scores[chunk_id] = { "path": chunk_data["path"], "start_line": chunk_data["start_line"], "end_line": chunk_data["end_line"], "snippet": bm25_map.get(chunk_id, {}).get( "snippet", snippet_text[:64] + "..." if len(snippet_text) > 64 else snippet_text ), "score": combined, } # 6. Sort by combined score and return top results sorted_results = sorted( combined_scores.values(), key=lambda x: x["score"], reverse=True ) return sorted_results[:max_results] def compact_conversation(self, user_message: str, assistant_response: str, tools_used: list = None) -> str: """Create a compact summary of a conversation for memory storage. Args: user_message: The user's input assistant_response: The assistant's full response tools_used: Optional list of tool names used (e.g., ['read_file', 'edit_file']) Returns: Compact summary string """ # Extract file paths mentioned import re file_paths = re.findall(r'[a-zA-Z]:[\\\/][\w\\\/\-\.]+\.\w+|[\w\/\-\.]+\.(?:py|md|yaml|yml|json|txt|js|ts)', assistant_response) file_paths = list(set(file_paths))[:5] # Limit to 5 unique paths # Truncate long responses if len(assistant_response) > 300: # Try to get first complete sentence or paragraph sentences = assistant_response.split('. ') if sentences and len(sentences[0]) < 200: summary = sentences[0] + '.' else: summary = assistant_response[:200] + '...' else: summary = assistant_response # Build compact entry compact = f"**User**: {user_message}\n**Action**: {summary}" if tools_used: compact += f"\n**Tools**: {', '.join(tools_used)}" if file_paths: compact += f"\n**Files**: {', '.join(file_paths[:3])}" # Max 3 file paths return compact def write_memory(self, content: str, daily: bool = True) -> None: """Write to memory file.""" if daily: today = datetime.now().strftime("%Y-%m-%d") file_path = self.memory_dir / f"{today}.md" else: file_path = self.workspace_dir / "MEMORY.md" if file_path.exists(): existing = file_path.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" file_path.write_text(content, encoding="utf-8") self.index_file(file_path) print(f"Written to {file_path.name}") def update_soul(self, content: str, append: bool = False) -> None: """Update SOUL.md (agent personality).""" soul_file = self.workspace_dir / "SOUL.md" if append and soul_file.exists(): existing = soul_file.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" soul_file.write_text(content, encoding="utf-8") self.index_file(soul_file) print("Updated SOUL.md") def update_user( self, username: str, content: str, append: bool = False ) -> None: """Update user-specific memory.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_file = self.users_dir / f"{username}.md" # Verify the resolved path is within users_dir try: resolved = user_file.resolve() if not resolved.is_relative_to(self.users_dir.resolve()): raise ValueError("Path traversal detected in username") except (ValueError, OSError) as e: raise ValueError(f"Invalid username path: {e}") if append and user_file.exists(): existing = user_file.read_text(encoding="utf-8") content = f"{existing}\n\n{content}" elif not user_file.exists(): content = f"# User: {username}\n\n{content}" user_file.write_text(content, encoding="utf-8") self.index_file(user_file) print(f"Updated users/{username}.md") def get_soul(self) -> str: """Get SOUL.md content.""" soul_file = self.workspace_dir / "SOUL.md" if soul_file.exists(): return soul_file.read_text(encoding="utf-8") return "" def get_user(self, username: str) -> str: """Get user-specific content.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_file = self.users_dir / f"{username}.md" # Verify the resolved path is within users_dir try: resolved = user_file.resolve() if not resolved.is_relative_to(self.users_dir.resolve()): raise ValueError("Path traversal detected in username") except (ValueError, OSError) as e: raise ValueError(f"Invalid username path: {e}") if user_file.exists(): return user_file.read_text(encoding="utf-8") return "" def list_users(self) -> List[str]: """List all users with memory files.""" return [f.stem for f in self.users_dir.glob("*.md")] def search_user( self, username: str, query: str, max_results: int = 5 ) -> List[Dict]: """Search within a specific user's memory.""" # Validate username to prevent path traversal if not username or not username.replace("-", "").replace("_", "").isalnum(): raise ValueError( "Invalid username: must contain only alphanumeric, " "hyphens, and underscores" ) user_path = f"users/{username}.md" # Sanitize query to prevent FTS5 injection safe_query = self._sanitize_fts5_query(query) results = self.db.execute( """ SELECT chunks.path, chunks.start_line, chunks.end_line, snippet(chunks_fts, 0, '**', '**', '...', 64) as snippet, bm25(chunks_fts) as score FROM chunks_fts JOIN chunks ON chunks.path = chunks_fts.path AND chunks.start_line = chunks_fts.start_line WHERE chunks_fts MATCH ? AND chunks.path = ? ORDER BY score LIMIT ? """, (safe_query, user_path, max_results), ).fetchall() return [dict(row) for row in results] def read_file( self, rel_path: str, from_line: Optional[int] = None, num_lines: Optional[int] = None, ) -> str: """Read content from a memory file.""" file_path = self.workspace_dir / rel_path # Verify the resolved path is within workspace_dir try: resolved = file_path.resolve() if not resolved.is_relative_to(self.workspace_dir.resolve()): raise ValueError("Path traversal detected") except (ValueError, OSError) as e: raise ValueError(f"Invalid file path: {e}") if not file_path.exists(): raise FileNotFoundError(f"File not found") content = file_path.read_text(encoding="utf-8") if from_line is not None: lines = content.split("\n") start = max(0, from_line - 1) end = start + num_lines if num_lines else len(lines) return "\n".join(lines[start:end]) return content def status(self) -> Dict: """Get memory system status.""" files = self.db.execute( "SELECT COUNT(*) as count FROM files" ).fetchone() chunks = self.db.execute( "SELECT COUNT(*) as count FROM chunks" ).fetchone() return { "workspace": str(self.workspace_dir), "database": str(self.db_path), "files": files["count"], "chunks": chunks["count"], "dirty": self.dirty, } def start_watching(self) -> None: """Start file watcher for auto-sync.""" class _MemoryFileHandler(FileSystemEventHandler): def __init__(self, memory_system: "MemorySystem") -> None: self.memory_system = memory_system def on_modified(self, event) -> None: if event.src_path.endswith(".md"): self.memory_system.dirty = True print( f"Detected change: {Path(event.src_path).name}" ) self.observer = Observer() handler = _MemoryFileHandler(self) self.observer.schedule( handler, str(self.workspace_dir), recursive=True ) self.observer.start() print(f"Watching {self.workspace_dir} for changes...") def stop_watching(self) -> None: """Stop file watcher.""" if self.observer: self.observer.stop() self.observer.join() def add_task( self, title: str, description: str = "", metadata: Optional[Dict] = None, ) -> int: """Add task for tracking.""" now = int(time.time() * 1000) cursor = self.db.execute( """ INSERT INTO tasks (title, description, status, created_at, updated_at, metadata) VALUES (?, ?, 'pending', ?, ?, ?) """, (title, description, now, now, str(metadata or {})), ) self.db.commit() return cursor.lastrowid def update_task( self, task_id: int, status: Optional[str] = None, description: Optional[str] = None, ) -> None: """Update task status or description.""" now = int(time.time() * 1000) updates = ["updated_at = ?"] params: list = [now] if status: updates.append("status = ?") params.append(status) if description: updates.append("description = ?") params.append(description) params.append(task_id) self.db.execute( f"UPDATE tasks SET {', '.join(updates)} WHERE id = ?", params, ) self.db.commit() def get_tasks(self, status: Optional[str] = None) -> List[Dict]: """Get tasks, optionally filtered by status.""" if status: rows = self.db.execute( "SELECT * FROM tasks WHERE status = ? " "ORDER BY created_at DESC", (status,), ).fetchall() else: rows = self.db.execute( "SELECT * FROM tasks ORDER BY created_at DESC" ).fetchall() return [dict(row) for row in rows] def close(self) -> None: """Close database and cleanup.""" self.stop_watching() # Save vector index before closing if len(self.vector_index) > 0: self.vector_index.save(str(self.vector_index_path)) self.db.close() if __name__ == "__main__": memory = MemorySystem() memory.sync() memory.update_soul( """ ## Learning Style - I learn from each interaction - I adapt to user preferences - I maintain consistency in my personality """, append=True, ) memory.update_user( "alice", """ ## Preferences - Likes detailed technical explanations - Working on Python projects - Prefers morning work sessions ## Current Projects - Building a memory system - Learning SQLite FTS5 """, ) memory.update_user( "bob", """ ## Preferences - Prefers concise answers - JavaScript developer - Works late nights ## Current Focus - React application - API integration """, ) memory.write_memory( """ # Project Setup Notes - Using SQLite for fast indexing - Markdown files are the source of truth - Daily logs in memory/YYYY-MM-DD.md - Long-term notes in MEMORY.md - SOUL.md defines agent personality - users/*.md for user-specific context """, daily=False, ) memory.write_memory( """ ## Today's Progress - Implemented basic memory system - Added full-text search with FTS5 - Added SOUL.md and user files - File watching works great """, daily=True, ) print("\nSearching for 'sqlite':") results = memory.search("sqlite") for result in results: print( f"\n{result['path']}:{result['start_line']}-" f"{result['end_line']}" ) print(f" {result['snippet']}") print(f" (score: {result['score']:.2f})") print("\n\nSearching Alice's memory for 'python':") alice_results = memory.search_user("alice", "python") for result in alice_results: print( f"\n{result['path']}:{result['start_line']}-" f"{result['end_line']}" ) print(f" {result['snippet']}") print("\n\nSOUL Content Preview:") soul = memory.get_soul() print(soul[:200] + "...") print(f"\n\nUsers with memory: {', '.join(memory.list_users())}") print("\nMemory Status:") status = memory.status() for key, value in status.items(): print(f" {key}: {value}") memory.close()