Files
ajarbot/mcp_tools.py

1822 lines
59 KiB
Python
Raw Permalink Normal View History

"""MCP Tools - In-process tools using Claude Agent SDK.
These tools run directly in the Python process for better performance
compared to the traditional tool execution flow. File and system tools
are ideal candidates for MCP since they don't require external APIs.
"""
from pathlib import Path
import subprocess
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from datetime import datetime
from claude_agent_sdk import tool, create_sdk_mcp_server
import httpx
from bs4 import BeautifulSoup
# Import memory system for hybrid search
try:
from memory_system import MemorySystem
MEMORY_AVAILABLE = True
except ImportError:
MEMORY_AVAILABLE = False
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
# Maximum page size for web fetching (500KB)
_MAX_WEB_PAGE_SIZE = 500_000
# Maximum text content from web page (10,000 chars ≈ 2,500 tokens)
_MAX_WEB_TEXT = 10_000
# Zettelkasten vault paths
_VAULT_ROOT = Path("memory_workspace/obsidian")
_VAULT_FLEETING = _VAULT_ROOT / "fleeting"
_VAULT_DAILY = _VAULT_ROOT / "daily"
_VAULT_PERMANENT = _VAULT_ROOT / "permanent"
_VAULT_LITERATURE = _VAULT_ROOT / "literature"
def _generate_note_id() -> str:
"""Generate unique timestamp-based note ID (YYYYMMDDHHmmss)."""
return datetime.now().strftime("%Y%m%d%H%M%S")
def _create_frontmatter(note_id: str, title: str, tags: list = None, note_type: str = "fleeting") -> str:
"""Create YAML front matter for zettelkasten note."""
now = datetime.now()
tags_str = ", ".join(tags) if tags else ""
return f"""---
id: {note_id}
title: {title}
created: {now.strftime("%Y-%m-%dT%H:%M:%S")}
modified: {now.strftime("%Y-%m-%dT%H:%M:%S")}
type: {note_type}
tags: [{tags_str}]
---
"""
def _ensure_vault_structure():
"""Ensure zettelkasten vault directories exist."""
for dir_path in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
dir_path.mkdir(parents=True, exist_ok=True)
def _get_memory_system() -> Optional['MemorySystem']:
"""Get memory system instance for hybrid search."""
if not MEMORY_AVAILABLE:
return None
try:
# Initialize memory system pointing to obsidian vault
memory = MemorySystem(workspace_dir=_VAULT_ROOT)
return memory
except Exception:
return None
def _find_related_notes_hybrid(title: str, content: str, max_results: int = 5) -> List[tuple]:
"""Find related notes using hybrid search (vector + keyword).
Returns list of (note_title, score) tuples.
"""
memory = _get_memory_system()
if memory:
# Use hybrid search for better results
try:
# Index the vault if needed
for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]:
if vault_dir.exists():
for note_path in vault_dir.glob("*.md"):
memory.index_file(note_path)
# Search with content + title as query
query = f"{title} {content[:500]}" # Limit content to first 500 chars
results = memory.search_hybrid(query, max_results=max_results)
# Convert results to (title, score) tuples
related = []
for result in results:
note_path = Path(result["path"])
note_title = note_path.stem
if ' - ' in note_title:
note_title = note_title.split(' - ', 1)[1]
# Use snippet match percentage as score
score = result.get("snippet", "").count("**") // 2 # Count of matches
related.append((note_title, score))
return related
except Exception:
pass # Fall back to keyword search
# Fallback: keyword search
related_notes = []
search_terms = title.lower().split() + content.lower().split()[:20]
search_terms = [term for term in search_terms if len(term) > 4]
for vault_dir in [_VAULT_FLEETING, _VAULT_PERMANENT, _VAULT_LITERATURE]:
if not vault_dir.exists():
continue
for note_path in vault_dir.glob("*.md"):
try:
note_content = note_path.read_text(encoding="utf-8").lower()
matches = sum(1 for term in search_terms if term in note_content)
if matches >= 2:
note_title = note_path.stem
if ' - ' in note_title:
note_title = note_title.split(' - ', 1)[1]
related_notes.append((note_title, matches))
except Exception:
continue
related_notes.sort(key=lambda x: x[1], reverse=True)
return related_notes[:max_results]
def _is_safe_url(url: str) -> bool:
"""Validate URL safety - blocks localhost, private IPs, and file:// URLs."""
try:
parsed = urlparse(url)
# Must be HTTP/HTTPS
if parsed.scheme not in ['http', 'https']:
return False
# Must have a hostname
if not parsed.hostname:
return False
# Block localhost and loopback IPs
blocked_hosts = ['localhost', '127.0.0.1', '0.0.0.0', '::1']
if parsed.hostname in blocked_hosts:
return False
# Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x)
if parsed.hostname:
parts = parsed.hostname.split('.')
if len(parts) == 4 and parts[0].isdigit():
first_octet = int(parts[0])
# Check for private ranges
if first_octet == 10: # 10.0.0.0/8
return False
if first_octet == 192 and len(parts) > 1 and parts[1].isdigit() and int(parts[1]) == 168: # 192.168.0.0/16
return False
if first_octet == 172 and len(parts) > 1 and parts[1].isdigit():
second_octet = int(parts[1])
if 16 <= second_octet <= 31: # 172.16.0.0/12
return False
return True
except Exception:
return False
@tool(
name="read_file",
description="Read the contents of a file. Use this to view configuration files, code, or any text file.",
input_schema={
"file_path": str,
},
)
async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Read and return file contents."""
file_path = args["file_path"]
path = Path(file_path)
if not path.exists():
return {
"content": [{"type": "text", "text": f"Error: File not found: {file_path}"}],
"isError": True
}
try:
content = path.read_text(encoding="utf-8")
if len(content) > _MAX_TOOL_OUTPUT:
content = content[:_MAX_TOOL_OUTPUT] + "\n... (file truncated)"
return {
"content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error reading file: {str(e)}"}],
"isError": True
}
@tool(
name="write_file",
description="Write content to a file. Creates a new file or overwrites existing file completely.",
input_schema={
"file_path": str,
"content": str,
},
)
async def write_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Write content to a file."""
file_path = args["file_path"]
content = args["content"]
path = Path(file_path)
try:
# Create parent directories if they don't exist
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return {
"content": [{
"type": "text",
"text": f"Successfully wrote to {file_path} ({len(content)} characters)"
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error writing file: {str(e)}"}],
"isError": True
}
@tool(
name="edit_file",
description="Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file.",
input_schema={
"file_path": str,
"old_text": str,
"new_text": str,
},
)
async def edit_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Edit file by replacing text."""
file_path = args["file_path"]
old_text = args["old_text"]
new_text = args["new_text"]
path = Path(file_path)
if not path.exists():
return {
"content": [{"type": "text", "text": f"Error: File not found: {file_path}"}],
"isError": True
}
try:
content = path.read_text(encoding="utf-8")
if old_text not in content:
return {
"content": [{
"type": "text",
"text": f"Error: Text not found in file. Could not find:\n{old_text[:100]}..."
}],
"isError": True
}
new_content = content.replace(old_text, new_text, 1)
path.write_text(new_content, encoding="utf-8")
return {
"content": [{
"type": "text",
"text": f"Successfully edited {file_path}. Replaced 1 occurrence."
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error editing file: {str(e)}"}],
"isError": True
}
@tool(
name="list_directory",
description="List files and directories in a given path. Useful for exploring the file system.",
input_schema={
"path": str,
},
)
async def list_directory_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""List directory contents."""
dir_path = Path(args.get("path", "."))
if not dir_path.exists():
return {
"content": [{"type": "text", "text": f"Error: Directory not found: {dir_path}"}],
"isError": True
}
if not dir_path.is_dir():
return {
"content": [{"type": "text", "text": f"Error: Not a directory: {dir_path}"}],
"isError": True
}
try:
items = []
for item in sorted(dir_path.iterdir()):
item_type = "DIR " if item.is_dir() else "FILE"
size = "" if item.is_dir() else f" ({item.stat().st_size} bytes)"
items.append(f" {item_type} {item.name}{size}")
if not items:
return {
"content": [{"type": "text", "text": f"Directory {dir_path} is empty"}]
}
return {
"content": [{
"type": "text",
"text": f"Contents of {dir_path}:\n" + "\n".join(items)
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error listing directory: {str(e)}"}],
"isError": True
}
@tool(
name="run_command",
description="Execute a shell command. Use for git operations, running scripts, installing packages, etc.",
input_schema={
"command": str,
"working_dir": str,
},
)
async def run_command_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a shell command."""
command = args["command"]
working_dir = args.get("working_dir", ".")
try:
result = subprocess.run(
command,
shell=True,
cwd=working_dir,
capture_output=True,
text=True,
timeout=30,
)
output = []
if result.stdout:
stdout = result.stdout
if len(stdout) > _MAX_TOOL_OUTPUT:
stdout = stdout[:_MAX_TOOL_OUTPUT] + "\n... (stdout truncated)"
output.append(f"STDOUT:\n{stdout}")
if result.stderr:
stderr = result.stderr
if len(stderr) > _MAX_TOOL_OUTPUT:
stderr = stderr[:_MAX_TOOL_OUTPUT] + "\n... (stderr truncated)"
output.append(f"STDERR:\n{stderr}")
status = f"Command exited with code {result.returncode}"
if not output:
text = status
else:
text = status + "\n\n" + "\n\n".join(output)
return {
"content": [{"type": "text", "text": text}],
"isError": result.returncode != 0
}
except subprocess.TimeoutExpired:
return {
"content": [{"type": "text", "text": "Error: Command timed out after 30 seconds"}],
"isError": True
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error running command: {str(e)}"}],
"isError": True
}
@tool(
name="web_fetch",
description="Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web.",
input_schema={
"url": str,
},
)
async def web_fetch_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Fetch webpage and return parsed text content.
This is a zero-cost MCP tool - it fetches the HTML and converts to text,
then returns it to the main Agent SDK query for processing (no extra API cost).
"""
url = args["url"]
# Security: Validate URL
if not _is_safe_url(url):
return {
"content": [{
"type": "text",
"text": f"Error: Blocked unsafe URL - {url}\n\nOnly http/https URLs to public domains are allowed."
}],
"isError": True
}
try:
# Fetch page with timeout and size limit
headers = {
"User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)"
}
async with httpx.AsyncClient(
timeout=10.0,
follow_redirects=True,
limits=httpx.Limits(max_connections=5),
headers=headers
) as client:
response = await client.get(url)
response.raise_for_status()
# Check size limit
if len(response.content) > _MAX_WEB_PAGE_SIZE:
return {
"content": [{
"type": "text",
"text": f"Error: Page too large ({len(response.content)} bytes, max {_MAX_WEB_PAGE_SIZE})"
}],
"isError": True
}
# Parse HTML to text
soup = BeautifulSoup(response.content, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
element.decompose()
# Extract text
text = soup.get_text(separator='\n', strip=True)
# Clean up excessive whitespace
lines = [line.strip() for line in text.split('\n') if line.strip()]
text = '\n'.join(lines)
# Truncate if needed
if len(text) > _MAX_WEB_TEXT:
text = text[:_MAX_WEB_TEXT] + "\n\n... (content truncated, page is very long)"
# Get title if available
title = soup.title.string if soup.title else "No title"
return {
"content": [{
"type": "text",
"text": f"Fetched: {response.url}\nTitle: {title}\n\n{text}"
}]
}
except httpx.TimeoutException:
return {
"content": [{
"type": "text",
"text": f"Error: Request to {url} timed out after 10 seconds"
}],
"isError": True
}
except httpx.HTTPStatusError as e:
return {
"content": [{
"type": "text",
"text": f"Error: HTTP {e.response.status_code} - {url}"
}],
"isError": True
}
except Exception as e:
return {
"content": [{
"type": "text",
"text": f"Error fetching webpage: {str(e)}"
}],
"isError": True
}
@tool(
name="fleeting_note",
description="Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes.",
input_schema={
"content": str,
"tags": str, # Comma-separated tags (optional)
},
)
async def fleeting_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a fleeting note (quick capture) in the zettelkasten vault.
Zero-cost MCP tool for instant thought capture. Perfect for ADHD-friendly quick notes.
"""
content = args["content"]
tags_str = args.get("tags", "")
try:
_ensure_vault_structure()
# Generate note ID and extract title from first line
note_id = _generate_note_id()
lines = content.split('\n', 1)
title = lines[0][:50] if lines else "Quick Note"
# Parse tags
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
tags.append("fleeting") # Always tag as fleeting
# Create note file
filename = f"{note_id} - {title.replace(':', '').replace('/', '-')}.md"
note_path = _VAULT_FLEETING / filename
# Write note with front matter
frontmatter = _create_frontmatter(note_id, title, tags, "fleeting")
full_content = frontmatter + content
note_path.write_text(full_content, encoding="utf-8")
return {
"content": [{
"type": "text",
"text": f"Created fleeting note: {filename}\nID: {note_id}\nLocation: {note_path}"
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error creating fleeting note: {str(e)}"}],
"isError": True
}
@tool(
name="daily_note",
description="Add an entry to today's daily note. Use this for journaling, logging activities, or tracking daily thoughts.",
input_schema={
"entry": str,
},
)
async def daily_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Append timestamped entry to today's daily note.
Zero-cost MCP tool for daily journaling and activity logging.
"""
entry = args["entry"]
try:
_ensure_vault_structure()
# Get today's date
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M")
# Daily note path
note_path = _VAULT_DAILY / f"{date_str}.md"
# Create or update daily note
if note_path.exists():
# Append to existing note
current_content = note_path.read_text(encoding="utf-8")
new_entry = f"\n## {time_str}\n{entry}\n"
updated_content = current_content + new_entry
else:
# Create new daily note with front matter
frontmatter = f"""---
date: {date_str}
type: daily
tags: [daily-note]
---
# {date_str}
## {time_str}
{entry}
"""
updated_content = frontmatter
note_path.write_text(updated_content, encoding="utf-8")
return {
"content": [{
"type": "text",
"text": f"Added entry to daily note: {date_str}\nTime: {time_str}\nLocation: {note_path}"
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error updating daily note: {str(e)}"}],
"isError": True
}
@tool(
name="literature_note",
description="Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation.",
input_schema={
"url": str,
"tags": str, # Comma-separated tags (optional)
},
)
async def literature_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Create literature note from web article.
Zero-cost MCP tool. Combines web_fetch with note creation.
"""
url = args["url"]
tags_str = args.get("tags", "")
try:
_ensure_vault_structure()
# Fetch article content using web_fetch logic
if not _is_safe_url(url):
return {
"content": [{
"type": "text",
"text": f"Error: Blocked unsafe URL - {url}"
}],
"isError": True
}
# Fetch the article
headers = {
"User-Agent": "Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code)"
}
async with httpx.AsyncClient(timeout=10.0, follow_redirects=True, headers=headers) as client:
response = await client.get(url)
response.raise_for_status()
if len(response.content) > _MAX_WEB_PAGE_SIZE:
return {
"content": [{
"type": "text",
"text": f"Error: Page too large"
}],
"isError": True
}
# Parse content
soup = BeautifulSoup(response.content, 'html.parser')
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form']):
element.decompose()
text = soup.get_text(separator='\n', strip=True)
lines = [line.strip() for line in text.split('\n') if line.strip()]
text = '\n'.join(lines)[:_MAX_WEB_TEXT]
title = soup.title.string if soup.title else "Untitled Article"
# Generate note
note_id = _generate_note_id()
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
tags.extend(["literature", "web-article"])
# Create note content
note_content = f"""# {title}
**Source**: {url}
**Captured**: {datetime.now().strftime("%Y-%m-%d")}
## Content
{text}
## Notes
(Add your thoughts, key takeaways, and connections to other notes here)
## Related
-
---
*This is a literature note. Process it into permanent notes with key insights.*
"""
# Create filename and path
filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md"
note_path = _VAULT_LITERATURE / filename
# Write with frontmatter
frontmatter = _create_frontmatter(note_id, title, tags, "literature")
full_content = frontmatter + note_content
note_path.write_text(full_content, encoding="utf-8")
return {
"content": [{
"type": "text",
"text": f"Created literature note from article\nTitle: {title}\nID: {note_id}\nLocation: {note_path}"
}]
}
except httpx.TimeoutException:
return {
"content": [{"type": "text", "text": f"Error: Request timed out"}],
"isError": True
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error creating literature note: {str(e)}"}],
"isError": True
}
@tool(
name="permanent_note",
description="Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base.",
input_schema={
"title": str,
"content": str,
"tags": str, # Comma-separated tags (optional)
},
)
async def permanent_note_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Create permanent note with smart linking suggestions.
Zero-cost MCP tool. Uses simple search for now.
Future: Will use hybrid search for better suggestions.
"""
title = args["title"]
content = args["content"]
tags_str = args.get("tags", "")
try:
_ensure_vault_structure()
# Generate note ID
note_id = _generate_note_id()
# Parse tags
tags = [tag.strip() for tag in tags_str.split(',')] if tags_str else []
tags.append("permanent")
# Search for related notes using hybrid search (vector + keyword)
related_notes = _find_related_notes_hybrid(title, content, max_results=5)
# Build related section
related_section = "\n## Related Notes\n"
if related_notes:
for note_title, _ in related_notes:
related_section += f"- [[{note_title}]]\n"
else:
related_section += "- (No related notes found yet)\n"
# Create full note content
note_content = f"""# {title}
{content}
{related_section}
---
*Created: {datetime.now().strftime("%Y-%m-%d %H:%M")}*
"""
# Create filename
filename = f"{note_id} - {title[:50].replace(':', '').replace('/', '-')}.md"
note_path = _VAULT_PERMANENT / filename
# Write with frontmatter
frontmatter = _create_frontmatter(note_id, title, tags, "permanent")
full_content = frontmatter + note_content
note_path.write_text(full_content, encoding="utf-8")
# Build result message
result_msg = f"Created permanent note: {title}\nID: {note_id}\nLocation: {note_path}"
if related_notes:
result_msg += f"\n\nSuggested links ({len(related_notes)} related notes):"
for note_title, matches in related_notes:
result_msg += f"\n- [[{note_title}]] ({matches} keyword matches)"
return {
"content": [{
"type": "text",
"text": result_msg
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error creating permanent note: {str(e)}"}],
"isError": True
}
@tool(
name="search_by_tags",
description="Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations.",
input_schema={
"tags": str, # Comma-separated tags to search for
"match_all": bool, # If true, note must have ALL tags. If false, ANY tag matches (optional, default false)
"limit": int, # Max results (optional, default 10)
},
)
async def search_by_tags_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Search vault by tags.
Zero-cost MCP tool. Searches YAML frontmatter for tag matches.
"""
tags_str = args["tags"]
match_all = args.get("match_all", False)
limit = args.get("limit", 10)
try:
_ensure_vault_structure()
# Parse search tags
search_tags = [tag.strip().lower() for tag in tags_str.split(',')]
results = []
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
if not vault_dir.exists():
continue
for note_path in vault_dir.glob("*.md"):
try:
content = note_path.read_text(encoding="utf-8")
# Extract tags from frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 2:
frontmatter = parts[1]
# Look for tags line
for line in frontmatter.split('\n'):
if line.strip().startswith('tags:'):
tags_part = line.split(':', 1)[1].strip()
# Remove brackets and split
tags_part = tags_part.strip('[]')
note_tags = [t.strip().lower() for t in tags_part.split(',')]
# Check match
if match_all:
if all(tag in note_tags for tag in search_tags):
results.append(note_path.name)
else:
if any(tag in note_tags for tag in search_tags):
results.append(note_path.name)
break
except Exception:
continue
# Limit results
results = results[:limit]
if not results:
match_type = "all" if match_all else "any"
return {
"content": [{
"type": "text",
"text": f"No notes found with {match_type} of tags: {tags_str}"
}]
}
result_text = f"Found {len(results)} note(s) with tags '{tags_str}':\n\n"
for i, note_name in enumerate(results, 1):
result_text += f"{i}. {note_name}\n"
return {
"content": [{
"type": "text",
"text": result_text
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error searching by tags: {str(e)}"}],
"isError": True
}
@tool(
name="search_vault",
description="Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags.",
input_schema={
"query": str,
"tags": str, # Optional comma-separated tags to filter by
"limit": int, # Max results (optional, default 5)
},
)
async def search_vault_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Search zettelkasten vault using hybrid search (vector + keyword).
Zero-cost MCP tool. Uses hybrid search when available for better results.
"""
query = args["query"]
tags_filter = args.get("tags", "")
limit = args.get("limit", 5)
try:
_ensure_vault_structure()
# Try hybrid search first
memory = _get_memory_system()
results = []
if memory and MEMORY_AVAILABLE:
try:
# Index vault
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
if vault_dir.exists():
for note_path in vault_dir.glob("*.md"):
memory.index_file(note_path)
# Hybrid search
search_results = memory.search_hybrid(query, max_results=limit * 2)
# Convert to results format
for result in search_results:
note_path = Path(result["path"])
results.append({
"file": note_path.name,
"path": str(note_path),
"snippet": result.get("snippet", "")[:300]
})
except Exception:
pass # Fall back to simple search
# Fallback: simple keyword search
if not results:
query_lower = query.lower()
for vault_dir in [_VAULT_FLEETING, _VAULT_DAILY, _VAULT_PERMANENT, _VAULT_LITERATURE]:
if not vault_dir.exists():
continue
for note_path in vault_dir.glob("*.md"):
try:
content = note_path.read_text(encoding="utf-8")
if query_lower in content.lower():
# Extract snippet
lines = content.split('\n')
for i, line in enumerate(lines):
if query_lower in line.lower():
start = max(0, i - 2)
end = min(len(lines), i + 3)
snippet = '\n'.join(lines[start:end])
results.append({
"file": note_path.name,
"path": str(note_path),
"snippet": snippet[:300]
})
break
except Exception:
continue
# Filter by tags if specified
if tags_filter:
filter_tags = [t.strip().lower() for t in tags_filter.split(',')]
filtered_results = []
for result in results:
try:
note_path = Path(result["path"])
content = note_path.read_text(encoding="utf-8")
# Check tags in frontmatter
if content.startswith("---"):
parts = content.split("---", 2)
if len(parts) >= 2:
frontmatter = parts[1]
for line in frontmatter.split('\n'):
if line.strip().startswith('tags:'):
tags_part = line.split(':', 1)[1].strip().strip('[]')
note_tags = [t.strip().lower() for t in tags_part.split(',')]
if any(tag in note_tags for tag in filter_tags):
filtered_results.append(result)
break
except Exception:
continue
results = filtered_results
# Limit results
results = results[:limit]
if not results:
tag_msg = f" with tags '{tags_filter}'" if tags_filter else ""
return {
"content": [{
"type": "text",
"text": f"No notes found matching: {query}{tag_msg}"
}]
}
# Format results
tag_msg = f" (filtered by tags: {tags_filter})" if tags_filter else ""
result_text = f"Found {len(results)} note(s) matching '{query}'{tag_msg}:\n\n"
for i, result in enumerate(results, 1):
result_text += f"{i}. {result['file']}\n"
result_text += f" {result['snippet']}\n\n"
return {
"content": [{
"type": "text",
"text": result_text
}]
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error searching vault: {str(e)}"}],
"isError": True
}
# ============================================
# Google and Weather Tools (MCP Migration)
# ============================================
# Lazy-loaded Google clients
_gmail_client: Optional[Any] = None
_calendar_client: Optional[Any] = None
_people_client: Optional[Any] = None
def _initialize_google_clients():
"""Lazy-load Google API clients when needed."""
global _gmail_client, _calendar_client, _people_client
if _gmail_client is not None:
return _gmail_client, _calendar_client, _people_client
try:
from google_tools.gmail_client import GmailClient
from google_tools.calendar_client import CalendarClient
from google_tools.people_client import PeopleClient
from google_tools.oauth_manager import GoogleOAuthManager
oauth_manager = GoogleOAuthManager()
credentials = oauth_manager.get_credentials()
if not credentials:
return None, None, None
_gmail_client = GmailClient(oauth_manager)
_calendar_client = CalendarClient(oauth_manager)
_people_client = PeopleClient(oauth_manager)
return _gmail_client, _calendar_client, _people_client
except Exception as e:
print(f"[MCP Google] Failed to initialize: {e}")
return None, None, None
@tool(
name="get_weather",
description="Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and description.",
input_schema={"location": str},
)
async def get_weather(args: Dict[str, Any]) -> Dict[str, Any]:
"""Get current weather for a location using OpenWeatherMap API."""
location = args.get("location", "Phoenix, US")
import os
import requests
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
if not api_key:
return {
"content": [{
"type": "text",
"text": "Error: OPENWEATHERMAP_API_KEY not found in environment variables"
}],
"isError": True
}
try:
base_url = "http://api.openweathermap.org/data/2.5/weather"
params = {
"q": location,
"appid": api_key,
"units": "imperial"
}
response = requests.get(base_url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
temp = data["main"]["temp"]
feels_like = data["main"]["feels_like"]
humidity = data["main"]["humidity"]
conditions = data["weather"][0]["main"]
description = data["weather"][0]["description"]
city_name = data["name"]
summary = (
f"Weather in {city_name}:\n"
f"Temperature: {temp}°F (feels like {feels_like}°F)\n"
f"Conditions: {conditions} - {description}\n"
f"Humidity: {humidity}%"
)
return {
"content": [{"type": "text", "text": summary}]
}
except Exception as e:
return {
"content": [{
"type": "text",
"text": f"Error getting weather: {str(e)}"
}],
"isError": True
}
@tool(
name="send_email",
description="Send an email via Gmail API. Requires prior OAuth setup (--setup-google).",
input_schema={"to": str, "subject": str, "body": str, "cc": str, "reply_to_message_id": str},
)
async def send_email(args: Dict[str, Any]) -> Dict[str, Any]:
"""Send an email via Gmail API."""
to = args["to"]
subject = args["subject"]
body = args["body"]
cc = args.get("cc")
reply_to_message_id = args.get("reply_to_message_id")
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = gmail_client.send_email(
to=to, subject=subject, body=body, cc=cc,
reply_to_message_id=reply_to_message_id,
)
if result["success"]:
msg_id = result.get("message_id", "unknown")
text = f"Email sent successfully to {to}\nMessage ID: {msg_id}\nSubject: {subject}"
else:
text = f"Error sending email: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="read_emails",
description="Search and read emails from Gmail using Gmail query syntax (e.g., 'from:user@example.com after:2026/02/10').",
input_schema={"query": str, "max_results": int, "include_body": bool},
)
async def read_emails(args: Dict[str, Any]) -> Dict[str, Any]:
"""Search and read emails from Gmail."""
query = args.get("query", "")
max_results = args.get("max_results", 10)
include_body = args.get("include_body", False)
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = gmail_client.search_emails(query=query, max_results=max_results, include_body=include_body)
if result["success"]:
summary = result.get("summary", "")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
else:
summary = f"Error reading emails: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": summary}], "isError": not result["success"]}
@tool(
name="get_email",
description="Get full content of a specific email by its Gmail message ID.",
input_schema={"message_id": str, "format_type": str},
)
async def get_email(args: Dict[str, Any]) -> Dict[str, Any]:
"""Get full content of a specific email."""
message_id = args["message_id"]
format_type = args.get("format_type", "text")
gmail_client, _, _ = _initialize_google_clients()
if not gmail_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = gmail_client.get_email(message_id=message_id, format_type=format_type)
if result["success"]:
email_data = result.get("email", {})
text = (
f"From: {email_data.get('from', 'Unknown')}\n"
f"To: {email_data.get('to', 'Unknown')}\n"
f"Subject: {email_data.get('subject', 'No subject')}\n"
f"Date: {email_data.get('date', 'Unknown')}\n\n"
f"{email_data.get('body', 'No content')}"
)
if len(text) > _MAX_TOOL_OUTPUT:
text = text[:_MAX_TOOL_OUTPUT] + "\n... (content truncated)"
else:
text = f"Error getting email: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="read_calendar",
description="Read upcoming events from Google Calendar. Shows events from today onwards.",
input_schema={"days_ahead": int, "calendar_id": str, "max_results": int},
)
async def read_calendar(args: Dict[str, Any]) -> Dict[str, Any]:
"""Read upcoming calendar events."""
days_ahead = args.get("days_ahead", 7)
calendar_id = args.get("calendar_id", "primary")
max_results = args.get("max_results", 20)
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = calendar_client.list_events(
days_ahead=days_ahead, calendar_id=calendar_id, max_results=max_results,
)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
text = f"Upcoming events (next {days_ahead} days):\n\n{summary}"
else:
text = f"Error reading calendar: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="create_calendar_event",
description="Create a new event in Google Calendar. Use ISO 8601 format for times.",
input_schema={
"summary": str, "start_time": str, "end_time": str,
"description": str, "location": str, "calendar_id": str,
},
)
async def create_calendar_event(args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new calendar event."""
summary = args["summary"]
start_time = args["start_time"]
end_time = args["end_time"]
description = args.get("description", "")
location = args.get("location", "")
calendar_id = args.get("calendar_id", "primary")
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = calendar_client.create_event(
summary=summary, start_time=start_time, end_time=end_time,
description=description, location=location, calendar_id=calendar_id,
)
if result["success"]:
event_id = result.get("event_id", "unknown")
html_link = result.get("html_link", "")
start = result.get("start", start_time)
text = (
f"Calendar event created successfully!\n"
f"Title: {summary}\nStart: {start}\n"
f"Event ID: {event_id}\nLink: {html_link}"
)
else:
text = f"Error creating calendar event: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="search_calendar",
description="Search calendar events by text query. Searches event titles and descriptions.",
input_schema={"query": str, "calendar_id": str},
)
async def search_calendar(args: Dict[str, Any]) -> Dict[str, Any]:
"""Search calendar events by text query."""
query = args["query"]
calendar_id = args.get("calendar_id", "primary")
_, calendar_client, _ = _initialize_google_clients()
if not calendar_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = calendar_client.search_events(query=query, calendar_id=calendar_id)
if result["success"]:
summary = result.get("summary", "No events found")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
text = f"Calendar search results for '{query}':\n\n{summary}"
else:
text = f"Error searching calendar: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
# ============================================
# Contacts Tools (MCP)
# ============================================
@tool(
name="create_contact",
description="Create a new Google contact. Requires prior OAuth setup (--setup-google).",
input_schema={"given_name": str, "family_name": str, "email": str, "phone": str, "notes": str},
)
async def create_contact(args: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new Google contact."""
given_name = args["given_name"]
family_name = args.get("family_name", "")
email = args.get("email", "")
phone = args.get("phone")
notes = args.get("notes")
_, _, people_client = _initialize_google_clients()
if not people_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = people_client.create_contact(
given_name=given_name, family_name=family_name,
email=email, phone=phone, notes=notes,
)
if result["success"]:
name = result.get("name", given_name)
resource = result.get("resource_name", "")
text = f"Contact created: {name}\nResource: {resource}"
else:
text = f"Error creating contact: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="list_contacts",
description="List or search Google contacts. Without a query, lists all contacts sorted by last name.",
input_schema={"max_results": int, "query": str},
)
async def list_contacts(args: Dict[str, Any]) -> Dict[str, Any]:
"""List or search Google contacts."""
max_results = args.get("max_results", 100)
query = args.get("query")
_, _, people_client = _initialize_google_clients()
if not people_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = people_client.list_contacts(max_results=max_results, query=query)
if result["success"]:
summary = result.get("summary", "No contacts found.")
if len(summary) > _MAX_TOOL_OUTPUT:
summary = summary[:_MAX_TOOL_OUTPUT] + "\n... (results truncated)"
text = f"Contacts ({result.get('count', 0)} found):\n\n{summary}"
else:
text = f"Error listing contacts: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
@tool(
name="get_contact",
description="Get full details of a specific Google contact by resource name.",
input_schema={"resource_name": str},
)
async def get_contact(args: Dict[str, Any]) -> Dict[str, Any]:
"""Get full details of a specific Google contact."""
resource_name = args["resource_name"]
_, _, people_client = _initialize_google_clients()
if not people_client:
return {
"content": [{"type": "text", "text": "Error: Google not authorized. Run: python bot_runner.py --setup-google"}],
"isError": True
}
result = people_client.get_contact(resource_name=resource_name)
if result["success"]:
c = result.get("contact", {})
output = []
name = c.get("display_name") or f"{c.get('given_name', '')} {c.get('family_name', '')}".strip()
output.append(f"Name: {name or '(no name)'}")
if c.get("email"):
output.append(f"Email: {c['email']}")
if c.get("phone"):
output.append(f"Phone: {c['phone']}")
if c.get("notes"):
output.append(f"Notes: {c['notes']}")
output.append(f"Resource: {c.get('resource_name', resource_name)}")
text = "\n".join(output)
else:
text = f"Error getting contact: {result.get('error', 'Unknown error')}"
return {"content": [{"type": "text", "text": text}], "isError": not result["success"]}
# ============================================
# Gitea Tools (MCP) - Private repo access
# ============================================
# Lazy-loaded Gitea client
_gitea_client: Optional[Any] = None
def _get_gitea_client():
"""Lazy-load Gitea client when first needed."""
global _gitea_client
if _gitea_client is not None:
return _gitea_client
try:
from gitea_tools.client import get_gitea_client
_gitea_client = get_gitea_client()
return _gitea_client
except Exception as e:
print(f"[MCP Gitea] Failed to initialize: {e}")
return None
@tool(
name="gitea_read_file",
description="Read a file from a Gitea repository. Use this to access files from Jordan's homelab repo or any configured Gitea repo. Returns the file content as text.",
input_schema={
"file_path": str,
"repo": str,
"branch": str,
},
)
async def gitea_read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Read a file from a Gitea repository.
Zero-cost MCP tool for accessing private Gitea repos.
"""
file_path = args.get("file_path", "")
repo = args.get("repo")
branch = args.get("branch")
if not file_path:
return {
"content": [{"type": "text", "text": "Error: file_path is required"}],
"isError": True,
}
client = _get_gitea_client()
if not client:
return {
"content": [{
"type": "text",
"text": (
"Error: Gitea not configured. "
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
"and add your Personal Access Token."
),
}],
"isError": True,
}
# Parse owner/repo if provided
owner = None
if repo and "/" in repo:
parts = repo.split("/", 1)
owner = parts[0]
repo = parts[1]
result = await client.get_file_content(
file_path=file_path,
owner=owner,
repo=repo,
branch=branch,
)
if result["success"]:
content = result["content"]
metadata = result.get("metadata", {})
path_info = metadata.get("path", file_path)
size = metadata.get("size", 0)
header = f"File: {path_info} ({size:,} bytes)"
if metadata.get("truncated"):
header += " [TRUNCATED]"
return {
"content": [{"type": "text", "text": f"{header}\n\n{content}"}],
}
else:
return {
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
"isError": True,
}
@tool(
name="gitea_list_files",
description="List files and folders in a directory in a Gitea repository. Use this to explore the structure of Jordan's homelab repo or any configured Gitea repo.",
input_schema={
"path": str,
"repo": str,
"branch": str,
},
)
async def gitea_list_files_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""List files and directories in a Gitea repo path.
Zero-cost MCP tool for browsing private Gitea repos.
"""
path = args.get("path", "")
repo = args.get("repo")
branch = args.get("branch")
client = _get_gitea_client()
if not client:
return {
"content": [{
"type": "text",
"text": (
"Error: Gitea not configured. "
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
"and add your Personal Access Token."
),
}],
"isError": True,
}
# Parse owner/repo if provided
owner = None
if repo and "/" in repo:
parts = repo.split("/", 1)
owner = parts[0]
repo = parts[1]
result = await client.list_files(
path=path,
owner=owner,
repo=repo,
branch=branch,
)
if result["success"]:
files = result["files"]
repo_name = result.get("repo", "")
display_path = result.get("path", "/")
count = result.get("count", 0)
# Format output
lines = [f"Directory: {repo_name}/{display_path} ({count} items)\n"]
for f in files:
if f["type"] == "dir":
lines.append(f" DIR {f['name']}/")
else:
size_str = f"({f['size']:,} bytes)" if f["size"] else ""
lines.append(f" FILE {f['name']} {size_str}")
return {
"content": [{"type": "text", "text": "\n".join(lines)}],
}
else:
return {
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
"isError": True,
}
@tool(
name="gitea_search_code",
description="Search for files by name/path in a Gitea repository. Searches file and directory names. For content search, use gitea_read_file on specific files.",
input_schema={
"query": str,
"repo": str,
},
)
async def gitea_search_code_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Search for code/files in a Gitea repository.
Zero-cost MCP tool. Searches file/directory names in the repo tree.
"""
query = args.get("query", "")
repo = args.get("repo")
if not query:
return {
"content": [{"type": "text", "text": "Error: query is required"}],
"isError": True,
}
client = _get_gitea_client()
if not client:
return {
"content": [{
"type": "text",
"text": (
"Error: Gitea not configured. "
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
"and add your Personal Access Token."
),
}],
"isError": True,
}
# Parse owner/repo if provided
owner = None
if repo and "/" in repo:
parts = repo.split("/", 1)
owner = parts[0]
repo = parts[1]
result = await client.search_code(
query=query,
owner=owner,
repo=repo,
)
if result["success"]:
results = result.get("results", [])
count = result.get("count", 0)
repo_name = result.get("repo", "")
if not results:
message = result.get("message", f"No results for '{query}'")
return {
"content": [{"type": "text", "text": message}],
}
lines = [f"Search results for '{query}' in {repo_name} ({count} matches):\n"]
for r in results:
type_icon = "DIR " if r["type"] == "dir" else "FILE"
size_str = f"({r['size']:,} bytes)" if r.get("size") else ""
lines.append(f" {type_icon} {r['path']} {size_str}")
return {
"content": [{"type": "text", "text": "\n".join(lines)}],
}
else:
return {
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
"isError": True,
}
@tool(
name="gitea_get_tree",
description="Get the directory tree structure from a Gitea repository. Shows all files and folders. Use recursive=true for the full tree.",
input_schema={
"repo": str,
"branch": str,
"recursive": bool,
},
)
async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Get directory tree from a Gitea repository.
Zero-cost MCP tool for viewing repo structure.
"""
repo = args.get("repo")
branch = args.get("branch")
recursive = args.get("recursive", False)
client = _get_gitea_client()
if not client:
return {
"content": [{
"type": "text",
"text": (
"Error: Gitea not configured. "
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
"and add your Personal Access Token."
),
}],
"isError": True,
}
# Parse owner/repo if provided
owner = None
if repo and "/" in repo:
parts = repo.split("/", 1)
owner = parts[0]
repo = parts[1]
result = await client.get_tree(
owner=owner,
repo=repo,
branch=branch,
recursive=recursive,
)
if result["success"]:
entries = result.get("entries", [])
repo_name = result.get("repo", "")
branch_name = result.get("branch", "main")
total = result.get("total", 0)
truncated = result.get("truncated", False)
lines = [f"Tree: {repo_name} (branch: {branch_name}, {total} entries)"]
if truncated:
lines[0] += " [TRUNCATED - tree too large]"
lines.append("")
for entry in entries:
if entry["type"] == "dir":
lines.append(f" {entry['path']}/")
else:
size_str = f"({entry['size']:,} bytes)" if entry.get("size") else ""
lines.append(f" {entry['path']} {size_str}")
# Truncate output if too long
text = "\n".join(lines)
if len(text) > _MAX_TOOL_OUTPUT:
text = text[:_MAX_TOOL_OUTPUT] + "\n\n... (tree truncated, use gitea_list_files for specific directories)"
return {
"content": [{"type": "text", "text": text}],
}
else:
return {
"content": [{"type": "text", "text": f"Error: {result['error']}"}],
"isError": True,
}
# Create the MCP server with all tools
file_system_server = create_sdk_mcp_server(
name="file_system",
version="2.0.0",
tools=[
# File and system tools
read_file_tool,
write_file_tool,
edit_file_tool,
list_directory_tool,
run_command_tool,
# Web tool
web_fetch_tool,
# Zettelkasten tools
fleeting_note_tool,
daily_note_tool,
literature_note_tool,
permanent_note_tool,
search_vault_tool,
search_by_tags_tool,
# Weather
get_weather,
# Gmail tools
send_email,
read_emails,
get_email,
# Calendar tools
read_calendar,
create_calendar_event,
search_calendar,
# Contacts tools
create_contact,
list_contacts,
get_contact,
# Gitea tools
gitea_read_file_tool,
gitea_list_files_tool,
gitea_search_code_tool,
gitea_get_tree_tool,
]
)