"""MCP Tools - In-process tools using Claude Agent SDK. These tools run directly in the Python process for better performance compared to the traditional tool execution flow. File and system tools are ideal candidates for MCP since they don't require external APIs. """ from pathlib import Path import subprocess from typing import Any, Dict, List, Optional from urllib.parse import urlparse from datetime import datetime from claude_agent_sdk import tool, create_sdk_mcp_server import httpx from bs4 import BeautifulSoup # Import memory system for hybrid search try: from memory_system import MemorySystem MEMORY_AVAILABLE = True except ImportError: MEMORY_AVAILABLE = False # Maximum characters of tool output to return (prevents token explosion) _MAX_TOOL_OUTPUT = 5000 # Maximum page size for web fetching (500KB) _MAX_WEB_PAGE_SIZE = 500_000 # Maximum text content from web page (10,000 chars ≈ 2,500 tokens) _MAX_WEB_TEXT = 10_000 # Zettelkasten vault paths _VAULT_ROOT = Path("memory_workspace/obsidian") _VAULT_FLEETING = _VAULT_ROOT / "fleeting" _VAULT_DAILY = _VAULT_ROOT / "daily" _VAULT_PERMANENT = _VAULT_ROOT / "permanent" _VAULT_LITERATURE = _VAULT_ROOT / "literature" def _generate_note_id() -> str: """Generate unique timestamp-based note ID (YYYYMMDDHHmmss).""" return datetime.now().strftime("%Y%m%d%H%M%S") def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str: """Create YAML front matter for zettelkasten note.""" now = datetime.now() tags_str = ", ".join(tags) if tags else "" return f"""--- id: {note_id} title: {title} created: {now.strftime("%Y-%m-%dT%H:%M:%S")} modified: {now.strftime("%Y-%m-%dT%H:%M:%S")} type: {note_type} tags: [{tags_str}] --- """ def _ensure_vault_structure(): """Ensure zettelkasten vault directories exist.""" for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: dir_path.mkdir(parents=True, exist_ok=True) def _get_memory_system() -> Optional['MemorySystem']: """Get memory system instance for hybrid search.""" if not MEMORY_AVAILABLE: return None try: # Initialize memory system pointing to obsidian vault memory = MemorySystem(workspace_dir=_VAULT_ROOT) return memory except Exception: return None def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]: """Find related notes using hybrid search (vector + keyword). Returns list of (note_title, score) tuples. """ memory = _get_memory_system() if memory: # Use hybrid search for better results try: # Index the vault if needed for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: if vault_dir.exists(): for note_path in vault_dir.glob("*.md"): memory.index_file(note_path) # Search with content + title as query query = f"{title} {content[:500]}" # Limit content to first 500 chars results = memory.search_hybrid(query, max_results=max_results) # Convert results to (title, score) tuples related = [] for result in results: note_path = Path(result["path"]) note_title = note_path.stem if ' - ' in note_title: note_title = note_title.split(' - ', 1)[1] # Use snippet match percentage as score score = result.get("snippet", "").count("**") // 2 # Count of matches related.append((note_title, score)) return related except Exception: pass # Fall back to keyword search # Fallback: keyword search related_notes = [] search_terms = title.lower().split() + content.lower().split()[:20] search_terms = [term for term in search_terms if len(term) > 4] for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: note_content = note_path.read_text(encoding="utf-8").lower() matches = sum(1 for term in search_terms if term in note_content) if matches >= 2: note_title = note_path.stem if ' - ' in note_title: note_title = note_title.split(' - ', 1)[1] related_notes.append((note_title, matches)) except Exception: continue related_notes.sort(key=lambda x: x[1], reverse=True) return related_notes[:max_results] def _is_safe_url(url: str) -> bool: """Validate URL safety - blocks localhost, private IPs, and file:// URLs.""" try: parsed = urlparse(url) # Must be HTTP/HTTPS if parsed.scheme not in ['http', 'https']: return False # Must have a hostname if not parsed.hostname: return False # Block localhost and loopback IPs blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1'] if parsed.hostname in blocked_hosts: return False # Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x) if parsed.hostname: parts = parsed.hostname.split('.') if len(parts) == 4 and parts[0].isdigit(): first_octet = int(parts[0]) # Check for private ranges if first_octet == 10: # 10.0.0.0/8 return False if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16 return False if first_octet == 172 and len(parts) > 1 and parts[1].isdigit(): second_octet = int(parts[1]) if 16 <= second_octet <= 31: # 172.16.0.0/12 return False return True except Exception: return False @tool( name="read_file", description="Read the contents of a file. Use this to view configuration files, code, or any text file.", input_schema={ "file_path": str, }, ) async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Read and return file contents.""" file_path = args["file_path"] path = Path(file_path) if not path.exists(): return { "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], "isError": True } try: content = path.read_text(encoding="utf-8") if len(content) > _MAX_TOOL_OUTPUT: content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)" return { "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] } except Exception as e: return { "content": [{"type": "text", "text": f"Error reading file: {str(e)}"}], "isError": True } @tool( name="write_file", description="Write content to a file. Creates a new file or overwrites existing file completely.", input_schema={ "file_path": str, "content": str, }, ) async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Write content to a file.""" file_path = args["file_path"] content = args["content"] path = Path(file_path) try: # Create parent directories if they don't exist path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Successfully wrote to {file_path} ({len(content)} characters)" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error writing file: {str(e)}"}], "isError": True } @tool( name="edit_file", description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.", input_schema={ "file_path": str, "old_text": str, "new_text": str, }, ) async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Edit file by replacing text.""" file_path = args["file_path"] old_text = args["old_text"] new_text = args["new_text"] path = Path(file_path) if not path.exists(): return { "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], "isError": True } try: content = path.read_text(encoding="utf-8") if old_text not in content: return { "content": [{ "type": "text", "text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..." }], "isError": True } new_content = content.replace(old_text, new_text, 1) path.write_text(new_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Successfully edited {file_path}. Replaced 1 occurrence." }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error editing file: {str(e)}"}], "isError": True } @tool( name="list_directory", description="List files and directories in a given path. Useful for exploring the file system.", input_schema={ "path": str, }, ) async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]: """List directory contents.""" dir_path = Path(args.get("path", ".")) if not dir_path.exists(): return { "content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}], "isError": True } if not dir_path.is_dir(): return { "content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}], "isError": True } try: items = [] for item in sorted(dir_path.iterdir()): item_type = "DIR " if item.is_dir() else "FILE" size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)" items.append(f" {item_type} {item.name}{size}") if not items: return { "content": [{"type": "text", "text": f"Directory {dir_path} is empty"}] } return { "content": [{ "type": "text", "text": f"Contents of {dir_path}:\n" + "\n".join(items) }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}], "isError": True } @tool( name="run_command", description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.", input_schema={ "command": str, "working_dir": str, }, ) async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Execute a shell command.""" command = args["command"] working_dir = args.get("working_dir", ".") try: result = subprocess.run( command, shell=True, cwd=working_dir, capture_output=True, text=True, timeout=30, ) output = [] if result.stdout: stdout = result.stdout if len(stdout) > _MAX_TOOL_OUTPUT: stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)" output.append(f"STDOUT:\n{stdout}") if result.stderr: stderr = result.stderr if len(stderr) > _MAX_TOOL_OUTPUT: stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)" output.append(f"STDERR:\n{stderr}") status = f"Command exited with code {result.returncode}" if not output: text = status else: text = status + "\n\n" + "\n\n".join(output) return { "content": [{"type": "text", "text": text}], "isError": result.returncode != 0 } except subprocess.TimeoutExpired: return { "content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"Error running command: {str(e)}"}], "isError": True } @tool( name="web_fetch", description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.", input_schema={ "url": str, }, ) async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Fetch webpage and return parsed text content. This is a zero-cost MCP tool - it fetches the HTML and converts to text, then returns it to the main Agent SDK query for processing (no extra API cost). """ url = args["url"] # Security: Validate URL if not _is_safe_url(url): return { "content": [{ "type": "text", "text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed." }], "isError": True } try: # Fetch page with timeout and size limit headers = { "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" } async with httpx.AsyncClient( timeout=10.0, follow_redirects=True, limits=httpx.Limits(max_connections=5), headers=headers ) as client: response = await client.get(url) response.raise_for_status() # Check size limit if len(response.content) > _MAX_WEB_PAGE_SIZE: return { "content": [{ "type": "text", "text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})" }], "isError": True } # Parse HTML to text soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): element.decompose() # Extract text text = soup.get_text(separator='\n', strip=True) # Clean up excessive whitespace lines = [line.strip() for line in text.split('\n') if line.strip()] text = '\n'.join(lines) # Truncate if needed if len(text) > _MAX_WEB_TEXT: text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)" # Get title if available title = soup.title.string if soup.title else "No title" return { "content": [{ "type": "text", "text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}" }] } except httpx.TimeoutException: return { "content": [{ "type": "text", "text": f"Error: Request to {url} timed out after 10 seconds" }], "isError": True } except httpx.HTTPStatusError as e: return { "content": [{ "type": "text", "text": f"Error: HTTP {e.response.status_code} - {url}" }], "isError": True } except Exception as e: return { "content": [{ "type": "text", "text": f"Error fetching webpage: {str(e)}" }], "isError": True } @tool( name="fleeting_note", description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.", input_schema={ "content": str, "tags": str, # Comma-separated tags (optional) }, ) async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create a fleeting note (quick capture) in the zettelkasten vault. Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes. """ content = args["content"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Generate note ID and extract title from first line note_id = _generate_note_id() lines = content.split('\n', 1) title = lines[0][:50] if lines else "Quick Note" # Parse tags tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.append("fleeting") # Always tag as fleeting # Create note file filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md" note_path = _VAULT_FLEETING / filename # Write note with front matter frontmatter = _create_frontmatter(note_id, title, tags, "fleeting") full_content = frontmatter + content note_path.write_text(full_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}], "isError": True } @tool( name="daily_note", description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.", input_schema={ "entry": str, }, ) async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Append timestamped entry to today's daily note. Zero-cost MCP tool for daily journaling and activity logging. """ entry = args["entry"] try: _ensure_vault_structure() # Get today's date now = datetime.now() date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M") # Daily note path note_path = _VAULT_DAILY / f"{date_str}.md" # Create or update daily note if note_path.exists(): # Append to existing note current_content = note_path.read_text(encoding="utf-8") new_entry = f"\n## {time_str}\n{entry}\n" updated_content = current_content + new_entry else: # Create new daily note with front matter frontmatter = f"""--- date: {date_str} type: daily tags: [daily-note] --- # {date_str} ## {time_str} {entry} """ updated_content = frontmatter note_path.write_text(updated_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}], "isError": True } @tool( name="literature_note", description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.", input_schema={ "url": str, "tags": str, # Comma-separated tags (optional) }, ) async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create literature note from web article. Zero-cost MCP tool. Combines web_fetch with note creation. """ url = args["url"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Fetch article content using web_fetch logic if not _is_safe_url(url): return { "content": [{ "type": "text", "text": f"Error: Blocked unsafe URL - {url}" }], "isError": True } # Fetch the article headers = { "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" } async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: response = await client.get(url) response.raise_for_status() if len(response.content) > _MAX_WEB_PAGE_SIZE: return { "content": [{ "type": "text", "text": f"Error: Page too large" }], "isError": True } # Parse content soup = BeautifulSoup(response.content, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): element.decompose() text = soup.get_text(separator='\n', strip=True) lines = [line.strip() for line in text.split('\n') if line.strip()] text = '\n'.join(lines)[:_MAX_WEB_TEXT] title = soup.title.string if soup.title else "Untitled Article" # Generate note note_id = _generate_note_id() tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.extend(["literature", "web-article"]) # Create note content note_content = f"""# {title} **Source**: {url} **Captured**: {datetime.now().strftime("%Y-%m-%d")} ## Content {text} ## Notes (Add your thoughts, key takeaways, and connections to other notes here) ## Related - --- *This is a literature note. Process it into permanent notes with key insights.* """ # Create filename and path filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" note_path = _VAULT_LITERATURE / filename # Write with frontmatter frontmatter = _create_frontmatter(note_id, title, tags, "literature") full_content = frontmatter + note_content note_path.write_text(full_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}" }] } except httpx.TimeoutException: return { "content": [{"type": "text", "text": f"Error: Request timed out"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}], "isError": True } @tool( name="permanent_note", description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.", input_schema={ "title": str, "content": str, "tags": str, # Comma-separated tags (optional) }, ) async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create permanent note with smart linking suggestions. Zero-cost MCP tool. Uses simple search for now. Future: Will use hybrid search for better suggestions. """ title = args["title"] content = args["content"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Generate note ID note_id = _generate_note_id() # Parse tags tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.append("permanent") # Search for related notes using hybrid search (vector + keyword) related_notes = _find_related_notes_hybrid(title, content, max_results=5) # Build related section related_section = "\n## Related Notes\n" if related_notes: for note_title, _ in related_notes: related_section += f"- [[{note_title}]]\n" else: related_section += "- (No related notes found yet)\n" # Create full note content note_content = f"""# {title} {content} {related_section} --- *Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}* """ # Create filename filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" note_path = _VAULT_PERMANENT / filename # Write with frontmatter frontmatter = _create_frontmatter(note_id, title, tags, "permanent") full_content = frontmatter + note_content note_path.write_text(full_content, encoding="utf-8") # Build result message result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}" if related_notes: result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):" for note_title, matches in related_notes: result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)" return { "content": [{ "type": "text", "text": result_msg }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}], "isError": True } @tool( name="search_by_tags", description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.", input_schema={ "tags": str, # Comma-separated tags to search for "match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false) "limit": int, # Max results (optional, default 10) }, ) async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Search vault by tags. Zero-cost MCP tool. Searches YAML frontmatter for tag matches. """ tags_str = args["tags"] match_all = args.get("match_all", False) limit = args.get("limit", 10) try: _ensure_vault_structure() # Parse search tags search_tags = [tag.strip().lower() for tag in tags_str.split(',')] results = [] for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: content = note_path.read_text(encoding="utf-8") # Extract tags from frontmatter if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 2: frontmatter = parts[1] # Look for tags line for line in frontmatter.split('\n'): if line.strip().startswith('tags:'): tags_part = line.split(':', 1)[1].strip() # Remove brackets and split tags_part = tags_part.strip('[]') note_tags = [t.strip().lower() for t in tags_part.split(',')] # Check match if match_all: if all(tag in note_tags for tag in search_tags): results.append(note_path.name) else: if any(tag in note_tags for tag in search_tags): results.append(note_path.name) break except Exception: continue # Limit results results = results[:limit] if not results: match_type = "all" if match_all else "any" return { "content": [{ "type": "text", "text": f"No notes found with {match_type} of tags: {tags_str}" }] } result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n" for i, note_name in enumerate(results, 1): result_text += f"{i}. {note_name}\n" return { "content": [{ "type": "text", "text": result_text }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}], "isError": True } @tool( name="search_vault", description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.", input_schema={ "query": str, "tags": str, # Optional comma-separated tags to filter by "limit": int, # Max results (optional, default 5) }, ) async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Search zettelkasten vault using hybrid search (vector + keyword). Zero-cost MCP tool. Uses hybrid search when available for better results. """ query = args["query"] tags_filter = args.get("tags", "") limit = args.get("limit", 5) try: _ensure_vault_structure() # Try hybrid search first memory = _get_memory_system() results = [] if memory and MEMORY_AVAILABLE: try: # Index vault for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if vault_dir.exists(): for note_path in vault_dir.glob("*.md"): memory.index_file(note_path) # Hybrid search search_results = memory.search_hybrid(query, max_results=limit * 2) # Convert to results format for result in search_results: note_path = Path(result["path"]) results.append({ "file": note_path.name, "path": str(note_path), "snippet": result.get("snippet", "")[:300] }) except Exception: pass # Fall back to simple search # Fallback: simple keyword search if not results: query_lower = query.lower() for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: content = note_path.read_text(encoding="utf-8") if query_lower in content.lower(): # Extract snippet lines = content.split('\n') for i, line in enumerate(lines): if query_lower in line.lower(): start = max(0, i - 2) end = min(len(lines), i + 3) snippet = '\n'.join(lines[start:end]) results.append({ "file": note_path.name, "path": str(note_path), "snippet": snippet[:300] }) break except Exception: continue # Filter by tags if specified if tags_filter: filter_tags = [t.strip().lower() for t in tags_filter.split(',')] filtered_results = [] for result in results: try: note_path = Path(result["path"]) content = note_path.read_text(encoding="utf-8") # Check tags in frontmatter if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 2: frontmatter = parts[1] for line in frontmatter.split('\n'): if line.strip().startswith('tags:'): tags_part = line.split(':', 1)[1].strip().strip('[]') note_tags = [t.strip().lower() for t in tags_part.split(',')] if any(tag in note_tags for tag in filter_tags): filtered_results.append(result) break except Exception: continue results = filtered_results # Limit results results = results[:limit] if not results: tag_msg = f" with tags '{tags_filter}'" if tags_filter else "" return { "content": [{ "type": "text", "text": f"No notes found matching: {query}{tag_msg}" }] } # Format results tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else "" result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n" for i, result in enumerate(results, 1): result_text += f"{i}. {result['file']}\n" result_text += f" {result['snippet']}\n\n" return { "content": [{ "type": "text", "text": result_text }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}], "isError": True } # Create the MCP server with all tools (file/system + web + zettelkasten) file_system_server = create_sdk_mcp_server( name="file_system", version="1.4.0", tools=[ read_file_tool, write_file_tool, edit_file_tool, list_directory_tool, run_command_tool, web_fetch_tool, fleeting_note_tool, daily_note_tool, literature_note_tool, permanent_note_tool, search_vault_tool, search_by_tags_tool, ] )