"""MCP Tools - In-process tools using Claude Agent SDK. These tools run directly in the Python process for better performance compared to the traditional tool execution flow. File and system tools are ideal candidates for MCP since they don't require external APIs. """ from pathlib import Path import subprocess from typing import Any, Dict, List, Optional from urllib.parse import urlparse from datetime import datetime from claude_agent_sdk import tool, create_sdk_mcp_server import httpx from bs4 import BeautifulSoup # Import memory system for hybrid search try: from memory_system import MemorySystem MEMORY_AVAILABLE = True except ImportError: MEMORY_AVAILABLE = False # Maximum characters of tool output to return (prevents token explosion) _MAX_TOOL_OUTPUT = 5000 # Maximum page size for web fetching (500KB) _MAX_WEB_PAGE_SIZE = 500_000 # Maximum text content from web page (10,000 chars ≈ 2,500 tokens) _MAX_WEB_TEXT = 10_000 # Zettelkasten vault paths _VAULT_ROOT = Path("memory_workspace/obsidian") _VAULT_FLEETING = _VAULT_ROOT / "fleeting" _VAULT_DAILY = _VAULT_ROOT / "daily" _VAULT_PERMANENT = _VAULT_ROOT / "permanent" _VAULT_LITERATURE = _VAULT_ROOT / "literature" def _generate_note_id() -> str: """Generate unique timestamp-based note ID (YYYYMMDDHHmmss).""" return datetime.now().strftime("%Y%m%d%H%M%S") def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str: """Create YAML front matter for zettelkasten note.""" now = datetime.now() tags_str = ", ".join(tags) if tags else "" return f"""--- id: {note_id} title: {title} created: {now.strftime("%Y-%m-%dT%H:%M:%S")} modified: {now.strftime("%Y-%m-%dT%H:%M:%S")} type: {note_type} tags: [{tags_str}] --- """ def _ensure_vault_structure(): """Ensure zettelkasten vault directories exist.""" for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: dir_path.mkdir(parents=True, exist_ok=True) def _get_memory_system() -> Optional['MemorySystem']: """Get memory system instance for hybrid search.""" if not MEMORY_AVAILABLE: return None try: # Initialize memory system pointing to obsidian vault memory = MemorySystem(workspace_dir=_VAULT_ROOT) return memory except Exception: return None def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]: """Find related notes using hybrid search (vector + keyword). Returns list of (note_title, score) tuples. """ memory = _get_memory_system() if memory: # Use hybrid search for better results try: # Index the vault if needed for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: if vault_dir.exists(): for note_path in vault_dir.glob("*.md"): memory.index_file(note_path) # Search with content + title as query query = f"{title} {content[:500]}" # Limit content to first 500 chars results = memory.search_hybrid(query, max_results=max_results) # Convert results to (title, score) tuples related = [] for result in results: note_path = Path(result["path"]) note_title = note_path.stem if ' - ' in note_title: note_title = note_title.split(' - ', 1)[1] # Use snippet match percentage as score score = result.get("snippet", "").count("**") // 2 # Count of matches related.append((note_title, score)) return related except Exception: pass # Fall back to keyword search # Fallback: keyword search related_notes = [] search_terms = title.lower().split() + content.lower().split()[:20] search_terms = [term for term in search_terms if len(term) > 4] for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: note_content = note_path.read_text(encoding="utf-8").lower() matches = sum(1 for term in search_terms if term in note_content) if matches >= 2: note_title = note_path.stem if ' - ' in note_title: note_title = note_title.split(' - ', 1)[1] related_notes.append((note_title, matches)) except Exception: continue related_notes.sort(key=lambda x: x[1], reverse=True) return related_notes[:max_results] def _is_safe_url(url: str) -> bool: """Validate URL safety - blocks localhost, private IPs, and file:// URLs.""" try: parsed = urlparse(url) # Must be HTTP/HTTPS if parsed.scheme not in ['http', 'https']: return False # Must have a hostname if not parsed.hostname: return False # Block localhost and loopback IPs blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1'] if parsed.hostname in blocked_hosts: return False # Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x) if parsed.hostname: parts = parsed.hostname.split('.') if len(parts) == 4 and parts[0].isdigit(): first_octet = int(parts[0]) # Check for private ranges if first_octet == 10: # 10.0.0.0/8 return False if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16 return False if first_octet == 172 and len(parts) > 1 and parts[1].isdigit(): second_octet = int(parts[1]) if 16 <= second_octet <= 31: # 172.16.0.0/12 return False return True except Exception: return False @tool( name="read_file", description="Read the contents of a file. Use this to view configuration files, code, or any text file.", input_schema={ "file_path": str, }, ) async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Read and return file contents.""" file_path = args["file_path"] path = Path(file_path) if not path.exists(): return { "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], "isError": True } try: content = path.read_text(encoding="utf-8") if len(content) > _MAX_TOOL_OUTPUT: content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)" return { "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] } except Exception as e: return { "content": [{"type": "text", "text": f"Error reading file: {str(e)}"}], "isError": True } @tool( name="write_file", description="Write content to a file. Creates a new file or overwrites existing file completely.", input_schema={ "file_path": str, "content": str, }, ) async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Write content to a file.""" file_path = args["file_path"] content = args["content"] path = Path(file_path) try: # Create parent directories if they don't exist path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Successfully wrote to {file_path} ({len(content)} characters)" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error writing file: {str(e)}"}], "isError": True } @tool( name="edit_file", description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.", input_schema={ "file_path": str, "old_text": str, "new_text": str, }, ) async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Edit file by replacing text.""" file_path = args["file_path"] old_text = args["old_text"] new_text = args["new_text"] path = Path(file_path) if not path.exists(): return { "content": [{"type": "text", "text": f"Error: File not found: {file_path}"}], "isError": True } try: content = path.read_text(encoding="utf-8") if old_text not in content: return { "content": [{ "type": "text", "text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..." }], "isError": True } new_content = content.replace(old_text, new_text, 1) path.write_text(new_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Successfully edited {file_path}. Replaced 1 occurrence." }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error editing file: {str(e)}"}], "isError": True } @tool( name="list_directory", description="List files and directories in a given path. Useful for exploring the file system.", input_schema={ "path": str, }, ) async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]: """List directory contents.""" dir_path = Path(args.get("path", ".")) if not dir_path.exists(): return { "content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}], "isError": True } if not dir_path.is_dir(): return { "content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}], "isError": True } try: items = [] for item in sorted(dir_path.iterdir()): item_type = "DIR " if item.is_dir() else "FILE" size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)" items.append(f" {item_type} {item.name}{size}") if not items: return { "content": [{"type": "text", "text": f"Directory {dir_path} is empty"}] } return { "content": [{ "type": "text", "text": f"Contents of {dir_path}:\n" + "\n".join(items) }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}], "isError": True } @tool( name="run_command", description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.", input_schema={ "command": str, "working_dir": str, }, ) async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Execute a shell command.""" command = args["command"] working_dir = args.get("working_dir", ".") try: result = subprocess.run( command, shell=True, cwd=working_dir, capture_output=True, text=True, timeout=30, ) output = [] if result.stdout: stdout = result.stdout if len(stdout) > _MAX_TOOL_OUTPUT: stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)" output.append(f"STDOUT:\n{stdout}") if result.stderr: stderr = result.stderr if len(stderr) > _MAX_TOOL_OUTPUT: stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)" output.append(f"STDERR:\n{stderr}") status = f"Command exited with code {result.returncode}" if not output: text = status else: text = status + "\n\n" + "\n\n".join(output) return { "content": [{"type": "text", "text": text}], "isError": result.returncode != 0 } except subprocess.TimeoutExpired: return { "content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"Error running command: {str(e)}"}], "isError": True } @tool( name="web_fetch", description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.", input_schema={ "url": str, }, ) async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Fetch webpage and return parsed text content. This is a zero-cost MCP tool - it fetches the HTML and converts to text, then returns it to the main Agent SDK query for processing (no extra API cost). """ url = args["url"] # Security: Validate URL if not _is_safe_url(url): return { "content": [{ "type": "text", "text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed." }], "isError": True } try: # Fetch page with timeout and size limit headers = { "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" } async with httpx.AsyncClient( timeout=10.0, follow_redirects=True, limits=httpx.Limits(max_connections=5), headers=headers ) as client: response = await client.get(url) response.raise_for_status() # Check size limit if len(response.content) > _MAX_WEB_PAGE_SIZE: return { "content": [{ "type": "text", "text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})" }], "isError": True } # Parse HTML to text soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): element.decompose() # Extract text text = soup.get_text(separator='\n', strip=True) # Clean up excessive whitespace lines = [line.strip() for line in text.split('\n') if line.strip()] text = '\n'.join(lines) # Truncate if needed if len(text) > _MAX_WEB_TEXT: text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)" # Get title if available title = soup.title.string if soup.title else "No title" return { "content": [{ "type": "text", "text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}" }] } except httpx.TimeoutException: return { "content": [{ "type": "text", "text": f"Error: Request to {url} timed out after 10 seconds" }], "isError": True } except httpx.HTTPStatusError as e: return { "content": [{ "type": "text", "text": f"Error: HTTP {e.response.status_code} - {url}" }], "isError": True } except Exception as e: return { "content": [{ "type": "text", "text": f"Error fetching webpage: {str(e)}" }], "isError": True } @tool( name="fleeting_note", description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.", input_schema={ "content": str, "tags": str, # Comma-separated tags (optional) }, ) async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create a fleeting note (quick capture) in the zettelkasten vault. Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes. """ content = args["content"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Generate note ID and extract title from first line note_id = _generate_note_id() lines = content.split('\n', 1) title = lines[0][:50] if lines else "Quick Note" # Parse tags tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.append("fleeting") # Always tag as fleeting # Create note file filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md" note_path = _VAULT_FLEETING / filename # Write note with front matter frontmatter = _create_frontmatter(note_id, title, tags, "fleeting") full_content = frontmatter + content note_path.write_text(full_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}], "isError": True } @tool( name="daily_note", description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.", input_schema={ "entry": str, }, ) async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Append timestamped entry to today's daily note. Zero-cost MCP tool for daily journaling and activity logging. """ entry = args["entry"] try: _ensure_vault_structure() # Get today's date now = datetime.now() date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M") # Daily note path note_path = _VAULT_DAILY / f"{date_str}.md" # Create or update daily note if note_path.exists(): # Append to existing note current_content = note_path.read_text(encoding="utf-8") new_entry = f"\n## {time_str}\n{entry}\n" updated_content = current_content + new_entry else: # Create new daily note with front matter frontmatter = f"""--- date: {date_str} type: daily tags: [daily-note] --- # {date_str} ## {time_str} {entry} """ updated_content = frontmatter note_path.write_text(updated_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}" }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}], "isError": True } @tool( name="literature_note", description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.", input_schema={ "url": str, "tags": str, # Comma-separated tags (optional) }, ) async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create literature note from web article. Zero-cost MCP tool. Combines web_fetch with note creation. """ url = args["url"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Fetch article content using web_fetch logic if not _is_safe_url(url): return { "content": [{ "type": "text", "text": f"Error: Blocked unsafe URL - {url}" }], "isError": True } # Fetch the article headers = { "User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)" } async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client: response = await client.get(url) response.raise_for_status() if len(response.content) > _MAX_WEB_PAGE_SIZE: return { "content": [{ "type": "text", "text": f"Error: Page too large" }], "isError": True } # Parse content soup = BeautifulSoup(response.content, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']): element.decompose() text = soup.get_text(separator='\n', strip=True) lines = [line.strip() for line in text.split('\n') if line.strip()] text = '\n'.join(lines)[:_MAX_WEB_TEXT] title = soup.title.string if soup.title else "Untitled Article" # Generate note note_id = _generate_note_id() tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.extend(["literature", "web-article"]) # Create note content note_content = f"""# {title} **Source**: {url} **Captured**: {datetime.now().strftime("%Y-%m-%d")} ## Content {text} ## Notes (Add your thoughts, key takeaways, and connections to other notes here) ## Related - --- *This is a literature note. Process it into permanent notes with key insights.* """ # Create filename and path filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" note_path = _VAULT_LITERATURE / filename # Write with frontmatter frontmatter = _create_frontmatter(note_id, title, tags, "literature") full_content = frontmatter + note_content note_path.write_text(full_content, encoding="utf-8") return { "content": [{ "type": "text", "text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}" }] } except httpx.TimeoutException: return { "content": [{"type": "text", "text": f"Error: Request timed out"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}], "isError": True } @tool( name="permanent_note", description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.", input_schema={ "title": str, "content": str, "tags": str, # Comma-separated tags (optional) }, ) async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Create permanent note with smart linking suggestions. Zero-cost MCP tool. Uses simple search for now. Future: Will use hybrid search for better suggestions. """ title = args["title"] content = args["content"] tags_str = args.get("tags", "") try: _ensure_vault_structure() # Generate note ID note_id = _generate_note_id() # Parse tags tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else [] tags.append("permanent") # Search for related notes using hybrid search (vector + keyword) related_notes = _find_related_notes_hybrid(title, content, max_results=5) # Build related section related_section = "\n## Related Notes\n" if related_notes: for note_title, _ in related_notes: related_section += f"- [[{note_title}]]\n" else: related_section += "- (No related notes found yet)\n" # Create full note content note_content = f"""# {title} {content} {related_section} --- *Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}* """ # Create filename filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md" note_path = _VAULT_PERMANENT / filename # Write with frontmatter frontmatter = _create_frontmatter(note_id, title, tags, "permanent") full_content = frontmatter + note_content note_path.write_text(full_content, encoding="utf-8") # Build result message result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}" if related_notes: result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):" for note_title, matches in related_notes: result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)" return { "content": [{ "type": "text", "text": result_msg }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}], "isError": True } @tool( name="search_by_tags", description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.", input_schema={ "tags": str, # Comma-separated tags to search for "match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false) "limit": int, # Max results (optional, default 10) }, ) async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Search vault by tags. Zero-cost MCP tool. Searches YAML frontmatter for tag matches. """ tags_str = args["tags"] match_all = args.get("match_all", False) limit = args.get("limit", 10) try: _ensure_vault_structure() # Parse search tags search_tags = [tag.strip().lower() for tag in tags_str.split(',')] results = [] for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: content = note_path.read_text(encoding="utf-8") # Extract tags from frontmatter if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 2: frontmatter = parts[1] # Look for tags line for line in frontmatter.split('\n'): if line.strip().startswith('tags:'): tags_part = line.split(':', 1)[1].strip() # Remove brackets and split tags_part = tags_part.strip('[]') note_tags = [t.strip().lower() for t in tags_part.split(',')] # Check match if match_all: if all(tag in note_tags for tag in search_tags): results.append(note_path.name) else: if any(tag in note_tags for tag in search_tags): results.append(note_path.name) break except Exception: continue # Limit results results = results[:limit] if not results: match_type = "all" if match_all else "any" return { "content": [{ "type": "text", "text": f"No notes found with {match_type} of tags: {tags_str}" }] } result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n" for i, note_name in enumerate(results, 1): result_text += f"{i}. {note_name}\n" return { "content": [{ "type": "text", "text": result_text }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}], "isError": True } @tool( name="search_vault", description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.", input_schema={ "query": str, "tags": str, # Optional comma-separated tags to filter by "limit": int, # Max results (optional, default 5) }, ) async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Search zettelkasten vault using hybrid search (vector + keyword). Zero-cost MCP tool. Uses hybrid search when available for better results. """ query = args["query"] tags_filter = args.get("tags", "") limit = args.get("limit", 5) try: _ensure_vault_structure() # Try hybrid search first memory = _get_memory_system() results = [] if memory and MEMORY_AVAILABLE: try: # Index vault for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if vault_dir.exists(): for note_path in vault_dir.glob("*.md"): memory.index_file(note_path) # Hybrid search search_results = memory.search_hybrid(query, max_results=limit * 2) # Convert to results format for result in search_results: note_path = Path(result["path"]) results.append({ "file": note_path.name, "path": str(note_path), "snippet": result.get("snippet", "")[:300] }) except Exception: pass # Fall back to simple search # Fallback: simple keyword search if not results: query_lower = query.lower() for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]: if not vault_dir.exists(): continue for note_path in vault_dir.glob("*.md"): try: content = note_path.read_text(encoding="utf-8") if query_lower in content.lower(): # Extract snippet lines = content.split('\n') for i, line in enumerate(lines): if query_lower in line.lower(): start = max(0, i - 2) end = min(len(lines), i + 3) snippet = '\n'.join(lines[start:end]) results.append({ "file": note_path.name, "path": str(note_path), "snippet": snippet[:300] }) break except Exception: continue # Filter by tags if specified if tags_filter: filter_tags = [t.strip().lower() for t in tags_filter.split(',')] filtered_results = [] for result in results: try: note_path = Path(result["path"]) content = note_path.read_text(encoding="utf-8") # Check tags in frontmatter if content.startswith("---"): parts = content.split("---", 2) if len(parts) >= 2: frontmatter = parts[1] for line in frontmatter.split('\n'): if line.strip().startswith('tags:'): tags_part = line.split(':', 1)[1].strip().strip('[]') note_tags = [t.strip().lower() for t in tags_part.split(',')] if any(tag in note_tags for tag in filter_tags): filtered_results.append(result) break except Exception: continue results = filtered_results # Limit results results = results[:limit] if not results: tag_msg = f" with tags '{tags_filter}'" if tags_filter else "" return { "content": [{ "type": "text", "text": f"No notes found matching: {query}{tag_msg}" }] } # Format results tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else "" result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n" for i, result in enumerate(results, 1): result_text += f"{i}. {result['file']}\n" result_text += f" {result['snippet']}\n\n" return { "content": [{ "type": "text", "text": result_text }] } except Exception as e: return { "content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}], "isError": True } # ============================================ # Google and Weather Tools (MCP Migration) # ============================================ # Lazy-loaded Google clients _gmail_client: Optional[Any] = None _calendar_client: Optional[Any] = None _people_client: Optional[Any] = None def _initialize_google_clients(): """Lazy-load Google API clients when needed.""" global _gmail_client, _calendar_client, _people_client if _gmail_client is not None: return _gmail_client, _calendar_client, _people_client try: from google_tools.gmail_client import GmailClient from google_tools.calendar_client import CalendarClient from google_tools.people_client import PeopleClient from google_tools.oauth_manager import GoogleOAuthManager oauth_manager = GoogleOAuthManager() credentials = oauth_manager.get_credentials() if not credentials: return None, None, None _gmail_client = GmailClient(oauth_manager) _calendar_client = CalendarClient(oauth_manager) _people_client = PeopleClient(oauth_manager) return _gmail_client, _calendar_client, _people_client except Exception as e: print(f"[MCP Google] Failed to initialize: {e}") return None, None, None @tool( name="get_weather", description="Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and description.", input_schema={"location": str}, ) async def get_weather(args: Dict[str, Any]) -> Dict[str, Any]: """Get current weather for a location using OpenWeatherMap API.""" location = args.get("location", "Phoenix, US") import os import requests api_key = os.getenv("OPENWEATHERMAP_API_KEY") if not api_key: return { "content": [{ "type": "text", "text": "Error: OPENWEATHERMAP_API_KEY not found in environment variables" }], "isError": True } try: base_url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": location, "appid": api_key, "units": "imperial" } response = requests.get(base_url, params=params, timeout=10) response.raise_for_status() data = response.json() temp = data["main"]["temp"] feels_like = data["main"]["feels_like"] humidity = data["main"]["humidity"] conditions = data["weather"][0]["main"] description = data["weather"][0]["description"] city_name = data["name"] summary = ( f"Weather in {city_name}:\n" f"Temperature: {temp}°F (feels like {feels_like}°F)\n" f"Conditions: {conditions} - {description}\n" f"Humidity: {humidity}%" ) return { "content": [{"type": "text", "text": summary}] } except Exception as e: return { "content": [{ "type": "text", "text": f"Error getting weather: {str(e)}" }], "isError": True } @tool( name="send_email", description="Send an email via Gmail API. Requires prior OAuth setup (--setup-google).", input_schema={"to": str, "subject": str, "body": str, "cc": str, "reply_to_message_id": str}, ) async def send_email(args: Dict[str, Any]) -> Dict[str, Any]: """Send an email via Gmail API.""" to = args["to"] subject = args["subject"] body = args["body"] cc = args.get("cc") reply_to_message_id = args.get("reply_to_message_id") gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = gmail_client.send_email( to=to, subject=subject, body=body, cc=cc, reply_to_message_id=reply_to_message_id, ) if result["success"]: msg_id = result.get("message_id", "unknown") text = f"Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}" else: text = f"Error sending email: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="read_emails", description="Search and read emails from Gmail using Gmail query syntax (e.g., 'from:user@example.com after:2026/02/10').", input_schema={"query": str, "max_results": int, "include_body": bool}, ) async def read_emails(args: Dict[str, Any]) -> Dict[str, Any]: """Search and read emails from Gmail.""" query = args.get("query", "") max_results = args.get("max_results", 10) include_body = args.get("include_body", False) gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = gmail_client.search_emails(query=query, max_results=max_results, include_body=include_body) if result["success"]: summary = result.get("summary", "") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" else: summary = f"Error reading emails: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": summary}], "isError": not result["success"]} @tool( name="get_email", description="Get full content of a specific email by its Gmail message ID.", input_schema={"message_id": str, "format_type": str}, ) async def get_email(args: Dict[str, Any]) -> Dict[str, Any]: """Get full content of a specific email.""" message_id = args["message_id"] format_type = args.get("format_type", "text") gmail_client, _, _ = _initialize_google_clients() if not gmail_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = gmail_client.get_email(message_id=message_id, format_type=format_type) if result["success"]: email_data = result.get("email", {}) text = ( f"From: {email_data.get('from', 'Unknown')}\n" f"To: {email_data.get('to', 'Unknown')}\n" f"Subject: {email_data.get('subject', 'No subject')}\n" f"Date: {email_data.get('date', 'Unknown')}\n\n" f"{email_data.get('body', 'No content')}" ) if len(text) > _MAX_TOOL_OUTPUT: text = text[:_MAX_TOOL_OUTPUT] + "\n... (content truncated)" else: text = f"Error getting email: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="read_calendar", description="Read upcoming events from Google Calendar. Shows events from today onwards.", input_schema={"days_ahead": int, "calendar_id": str, "max_results": int}, ) async def read_calendar(args: Dict[str, Any]) -> Dict[str, Any]: """Read upcoming calendar events.""" days_ahead = args.get("days_ahead", 7) calendar_id = args.get("calendar_id", "primary") max_results = args.get("max_results", 20) _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = calendar_client.list_events( days_ahead=days_ahead, calendar_id=calendar_id, max_results=max_results, ) if result["success"]: summary = result.get("summary", "No events found") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" text = f"Upcoming events (next {days_ahead} days):\n\n{summary}" else: text = f"Error reading calendar: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="create_calendar_event", description="Create a new event in Google Calendar. Use ISO 8601 format for times.", input_schema={ "summary": str, "start_time": str, "end_time": str, "description": str, "location": str, "calendar_id": str, }, ) async def create_calendar_event(args: Dict[str, Any]) -> Dict[str, Any]: """Create a new calendar event.""" summary = args["summary"] start_time = args["start_time"] end_time = args["end_time"] description = args.get("description", "") location = args.get("location", "") calendar_id = args.get("calendar_id", "primary") _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = calendar_client.create_event( summary=summary, start_time=start_time, end_time=end_time, description=description, location=location, calendar_id=calendar_id, ) if result["success"]: event_id = result.get("event_id", "unknown") html_link = result.get("html_link", "") start = result.get("start", start_time) text = ( f"Calendar event created successfully!\n" f"Title: {summary}\nStart: {start}\n" f"Event ID: {event_id}\nLink: {html_link}" ) else: text = f"Error creating calendar event: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="search_calendar", description="Search calendar events by text query. Searches event titles and descriptions.", input_schema={"query": str, "calendar_id": str}, ) async def search_calendar(args: Dict[str, Any]) -> Dict[str, Any]: """Search calendar events by text query.""" query = args["query"] calendar_id = args.get("calendar_id", "primary") _, calendar_client, _ = _initialize_google_clients() if not calendar_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = calendar_client.search_events(query=query, calendar_id=calendar_id) if result["success"]: summary = result.get("summary", "No events found") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" text = f"Calendar search results for '{query}':\n\n{summary}" else: text = f"Error searching calendar: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} # ============================================ # Contacts Tools (MCP) # ============================================ @tool( name="create_contact", description="Create a new Google contact. Requires prior OAuth setup (--setup-google).", input_schema={"given_name": str, "family_name": str, "email": str, "phone": str, "notes": str}, ) async def create_contact(args: Dict[str, Any]) -> Dict[str, Any]: """Create a new Google contact.""" given_name = args["given_name"] family_name = args.get("family_name", "") email = args.get("email", "") phone = args.get("phone") notes = args.get("notes") _, _, people_client = _initialize_google_clients() if not people_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = people_client.create_contact( given_name=given_name, family_name=family_name, email=email, phone=phone, notes=notes, ) if result["success"]: name = result.get("name", given_name) resource = result.get("resource_name", "") text = f"Contact created: {name}\nResource: {resource}" else: text = f"Error creating contact: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="list_contacts", description="List or search Google contacts. Without a query, lists all contacts sorted by last name.", input_schema={"max_results": int, "query": str}, ) async def list_contacts(args: Dict[str, Any]) -> Dict[str, Any]: """List or search Google contacts.""" max_results = args.get("max_results", 100) query = args.get("query") _, _, people_client = _initialize_google_clients() if not people_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = people_client.list_contacts(max_results=max_results, query=query) if result["success"]: summary = result.get("summary", "No contacts found.") if len(summary) > _MAX_TOOL_OUTPUT: summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)" text = f"Contacts ({result.get('count', 0)} found):\n\n{summary}" else: text = f"Error listing contacts: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} @tool( name="get_contact", description="Get full details of a specific Google contact by resource name.", input_schema={"resource_name": str}, ) async def get_contact(args: Dict[str, Any]) -> Dict[str, Any]: """Get full details of a specific Google contact.""" resource_name = args["resource_name"] _, _, people_client = _initialize_google_clients() if not people_client: return { "content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}], "isError": True } result = people_client.get_contact(resource_name=resource_name) if result["success"]: c = result.get("contact", {}) output = [] name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip() output.append(f"Name: {name or '(no name)'}") if c.get("email"): output.append(f"Email: {c['email']}") if c.get("phone"): output.append(f"Phone: {c['phone']}") if c.get("notes"): output.append(f"Notes: {c['notes']}") output.append(f"Resource: {c.get('resource_name', resource_name)}") text = "\n".join(output) else: text = f"Error getting contact: {result.get('error', 'Unknown error')}" return {"content": [{"type": "text", "text": text}], "isError": not result["success"]} # ============================================ # Gitea Tools (MCP) - Private repo access # ============================================ # Lazy-loaded Gitea client _gitea_client: Optional[Any] = None def _get_gitea_client(): """Lazy-load Gitea client when first needed.""" global _gitea_client if _gitea_client is not None: return _gitea_client try: from gitea_tools.client import get_gitea_client _gitea_client = get_gitea_client() return _gitea_client except Exception as e: print(f"[MCP Gitea] Failed to initialize: {e}") return None @tool( name="gitea_read_file", description="Read a file from a Gitea repository. Use this to access files from Jordan's homelab repo or any configured Gitea repo. Returns the file content as text.", input_schema={ "file_path": str, "repo": str, "branch": str, }, ) async def gitea_read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Read a file from a Gitea repository. Zero-cost MCP tool for accessing private Gitea repos. """ file_path = args.get("file_path", "") repo = args.get("repo") branch = args.get("branch") if not file_path: return { "content": [{"type": "text", "text": "Error: file_path is required"}], "isError": True, } client = _get_gitea_client() if not client: return { "content": [{ "type": "text", "text": ( "Error: Gitea not configured. " "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " "and add your Personal Access Token." ), }], "isError": True, } # Parse owner/repo if provided owner = None if repo and "/" in repo: parts = repo.split("/", 1) owner = parts[0] repo = parts[1] result = await client.get_file_content( file_path=file_path, owner=owner, repo=repo, branch=branch, ) if result["success"]: content = result["content"] metadata = result.get("metadata", {}) path_info = metadata.get("path", file_path) size = metadata.get("size", 0) header = f"File: {path_info} ({size:,} bytes)" if metadata.get("truncated"): header += " [TRUNCATED]" return { "content": [{"type": "text", "text": f"{header}\n\n{content}"}], } else: return { "content": [{"type": "text", "text": f"Error: {result['error']}"}], "isError": True, } @tool( name="gitea_list_files", description="List files and folders in a directory in a Gitea repository. Use this to explore the structure of Jordan's homelab repo or any configured Gitea repo.", input_schema={ "path": str, "repo": str, "branch": str, }, ) async def gitea_list_files_tool(args: Dict[str, Any]) -> Dict[str, Any]: """List files and directories in a Gitea repo path. Zero-cost MCP tool for browsing private Gitea repos. """ path = args.get("path", "") repo = args.get("repo") branch = args.get("branch") client = _get_gitea_client() if not client: return { "content": [{ "type": "text", "text": ( "Error: Gitea not configured. " "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " "and add your Personal Access Token." ), }], "isError": True, } # Parse owner/repo if provided owner = None if repo and "/" in repo: parts = repo.split("/", 1) owner = parts[0] repo = parts[1] result = await client.list_files( path=path, owner=owner, repo=repo, branch=branch, ) if result["success"]: files = result["files"] repo_name = result.get("repo", "") display_path = result.get("path", "/") count = result.get("count", 0) # Format output lines = [f"Directory: {repo_name}/{display_path} ({count} items)\n"] for f in files: if f["type"] == "dir": lines.append(f" DIR {f['name']}/") else: size_str = f"({f['size']:,} bytes)" if f["size"] else "" lines.append(f" FILE {f['name']} {size_str}") return { "content": [{"type": "text", "text": "\n".join(lines)}], } else: return { "content": [{"type": "text", "text": f"Error: {result['error']}"}], "isError": True, } @tool( name="gitea_search_code", description="Search for files by name/path in a Gitea repository. Searches file and directory names. For content search, use gitea_read_file on specific files.", input_schema={ "query": str, "repo": str, }, ) async def gitea_search_code_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Search for code/files in a Gitea repository. Zero-cost MCP tool. Searches file/directory names in the repo tree. """ query = args.get("query", "") repo = args.get("repo") if not query: return { "content": [{"type": "text", "text": "Error: query is required"}], "isError": True, } client = _get_gitea_client() if not client: return { "content": [{ "type": "text", "text": ( "Error: Gitea not configured. " "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " "and add your Personal Access Token." ), }], "isError": True, } # Parse owner/repo if provided owner = None if repo and "/" in repo: parts = repo.split("/", 1) owner = parts[0] repo = parts[1] result = await client.search_code( query=query, owner=owner, repo=repo, ) if result["success"]: results = result.get("results", []) count = result.get("count", 0) repo_name = result.get("repo", "") if not results: message = result.get("message", f"No results for '{query}'") return { "content": [{"type": "text", "text": message}], } lines = [f"Search results for '{query}' in {repo_name} ({count} matches):\n"] for r in results: type_icon = "DIR " if r["type"] == "dir" else "FILE" size_str = f"({r['size']:,} bytes)" if r.get("size") else "" lines.append(f" {type_icon} {r['path']} {size_str}") return { "content": [{"type": "text", "text": "\n".join(lines)}], } else: return { "content": [{"type": "text", "text": f"Error: {result['error']}"}], "isError": True, } @tool( name="gitea_get_tree", description="Get the directory tree structure from a Gitea repository. Shows all files and folders. Use recursive=true for the full tree.", input_schema={ "repo": str, "branch": str, "recursive": bool, }, ) async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]: """Get directory tree from a Gitea repository. Zero-cost MCP tool for viewing repo structure. """ repo = args.get("repo") branch = args.get("branch") recursive = args.get("recursive", False) client = _get_gitea_client() if not client: return { "content": [{ "type": "text", "text": ( "Error: Gitea not configured. " "Copy config/gitea_config.example.yaml to config/gitea_config.yaml " "and add your Personal Access Token." ), }], "isError": True, } # Parse owner/repo if provided owner = None if repo and "/" in repo: parts = repo.split("/", 1) owner = parts[0] repo = parts[1] result = await client.get_tree( owner=owner, repo=repo, branch=branch, recursive=recursive, ) if result["success"]: entries = result.get("entries", []) repo_name = result.get("repo", "") branch_name = result.get("branch", "main") total = result.get("total", 0) truncated = result.get("truncated", False) lines = [f"Tree: {repo_name} (branch: {branch_name}, {total} entries)"] if truncated: lines[0] += " [TRUNCATED - tree too large]" lines.append("") for entry in entries: if entry["type"] == "dir": lines.append(f" {entry['path']}/") else: size_str = f"({entry['size']:,} bytes)" if entry.get("size") else "" lines.append(f" {entry['path']} {size_str}") # Truncate output if too long text = "\n".join(lines) if len(text) > _MAX_TOOL_OUTPUT: text = text[:_MAX_TOOL_OUTPUT] + "\n\n... (tree truncated, use gitea_list_files for specific directories)" return { "content": [{"type": "text", "text": text}], } else: return { "content": [{"type": "text", "text": f"Error: {result['error']}"}], "isError": True, } # Create the MCP server with all tools file_system_server = create_sdk_mcp_server( name="file_system", version="2.0.0", tools=[ # File and system tools read_file_tool, write_file_tool, edit_file_tool, list_directory_tool, run_command_tool, # Web tool web_fetch_tool, # Zettelkasten tools fleeting_note_tool, daily_note_tool, literature_note_tool, permanent_note_tool, search_vault_tool, search_by_tags_tool, # Weather get_weather, # Gmail tools send_email, read_emails, get_email, # Calendar tools read_calendar, create_calendar_event, search_calendar, # Contacts tools create_contact, list_contacts, get_contact, # Gitea tools gitea_read_file_tool, gitea_list_files_tool, gitea_search_code_tool, gitea_get_tree_tool, ] )