2026-02-16 07:43:31 -07:00
""" MCP Tools - In-process tools using Claude Agent SDK.
These tools run directly in the Python process for better performance
compared to the traditional tool execution flow . File and system tools
are ideal candidates for MCP since they don ' t require external APIs.
"""
from pathlib import Path
import subprocess
from typing import Any , Dict , List , Optional
from urllib . parse import urlparse
from datetime import datetime
from claude_agent_sdk import tool , create_sdk_mcp_server
import httpx
from bs4 import BeautifulSoup
# Import memory system for hybrid search
try :
from memory_system import MemorySystem
MEMORY_AVAILABLE = True
except ImportError :
MEMORY_AVAILABLE = False
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
# Maximum page size for web fetching (500KB)
_MAX_WEB_PAGE_SIZE = 500_000
# Maximum text content from web page (10,000 chars ≈ 2,500 tokens)
_MAX_WEB_TEXT = 10_000
# Zettelkasten vault paths
_VAULT_ROOT = Path ( " memory_workspace/obsidian " )
_VAULT_FLEETING = _VAULT_ROOT / " fleeting "
_VAULT_DAILY = _VAULT_ROOT / " daily "
_VAULT_PERMANENT = _VAULT_ROOT / " permanent "
_VAULT_LITERATURE = _VAULT_ROOT / " literature "
def _generate_note_id ( ) - > str :
""" Generate unique timestamp-based note ID (YYYYMMDDHHmmss). """
return datetime . now ( ) . strftime ( " % Y % m %d % H % M % S " )
def _create_frontmatter ( note_id : str , title : str , tags : list = None , note_type : str = " fleeting " ) - > str :
""" Create YAML front matter for zettelkasten note. """
now = datetime . now ( )
tags_str = " , " . join ( tags ) if tags else " "
return f """ ---
id : { note_id }
title : { title }
created : { now . strftime ( " % Y- % m- %d T % H: % M: % S " ) }
modified : { now . strftime ( " % Y- % m- %d T % H: % M: % S " ) }
type : { note_type }
tags : [ { tags_str } ]
- - -
"""
def _ensure_vault_structure ( ) :
""" Ensure zettelkasten vault directories exist. """
for dir_path in [ _VAULT_FLEETING , _VAULT_DAILY , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
dir_path . mkdir ( parents = True , exist_ok = True )
def _get_memory_system ( ) - > Optional [ ' MemorySystem ' ] :
""" Get memory system instance for hybrid search. """
if not MEMORY_AVAILABLE :
return None
try :
# Initialize memory system pointing to obsidian vault
memory = MemorySystem ( workspace_dir = _VAULT_ROOT )
return memory
except Exception :
return None
def _find_related_notes_hybrid ( title : str , content : str , max_results : int = 5 ) - > List [ tuple ] :
""" Find related notes using hybrid search (vector + keyword).
Returns list of ( note_title , score ) tuples .
"""
memory = _get_memory_system ( )
if memory :
# Use hybrid search for better results
try :
# Index the vault if needed
for vault_dir in [ _VAULT_FLEETING , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
if vault_dir . exists ( ) :
for note_path in vault_dir . glob ( " *.md " ) :
memory . index_file ( note_path )
# Search with content + title as query
query = f " { title } { content [ : 500 ] } " # Limit content to first 500 chars
results = memory . search_hybrid ( query , max_results = max_results )
# Convert results to (title, score) tuples
related = [ ]
for result in results :
note_path = Path ( result [ " path " ] )
note_title = note_path . stem
if ' - ' in note_title :
note_title = note_title . split ( ' - ' , 1 ) [ 1 ]
# Use snippet match percentage as score
score = result . get ( " snippet " , " " ) . count ( " ** " ) / / 2 # Count of matches
related . append ( ( note_title , score ) )
return related
except Exception :
pass # Fall back to keyword search
# Fallback: keyword search
related_notes = [ ]
search_terms = title . lower ( ) . split ( ) + content . lower ( ) . split ( ) [ : 20 ]
search_terms = [ term for term in search_terms if len ( term ) > 4 ]
for vault_dir in [ _VAULT_FLEETING , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
if not vault_dir . exists ( ) :
continue
for note_path in vault_dir . glob ( " *.md " ) :
try :
note_content = note_path . read_text ( encoding = " utf-8 " ) . lower ( )
matches = sum ( 1 for term in search_terms if term in note_content )
if matches > = 2 :
note_title = note_path . stem
if ' - ' in note_title :
note_title = note_title . split ( ' - ' , 1 ) [ 1 ]
related_notes . append ( ( note_title , matches ) )
except Exception :
continue
related_notes . sort ( key = lambda x : x [ 1 ] , reverse = True )
return related_notes [ : max_results ]
def _is_safe_url ( url : str ) - > bool :
""" Validate URL safety - blocks localhost, private IPs, and file:// URLs. """
try :
parsed = urlparse ( url )
# Must be HTTP/HTTPS
if parsed . scheme not in [ ' http ' , ' https ' ] :
return False
# Must have a hostname
if not parsed . hostname :
return False
# Block localhost and loopback IPs
blocked_hosts = [ ' localhost ' , ' 127.0.0.1 ' , ' 0.0.0.0 ' , ' ::1 ' ]
if parsed . hostname in blocked_hosts :
return False
# Block private IP ranges (10.x.x.x, 192.168.x.x, 172.16-31.x.x)
if parsed . hostname :
parts = parsed . hostname . split ( ' . ' )
if len ( parts ) == 4 and parts [ 0 ] . isdigit ( ) :
first_octet = int ( parts [ 0 ] )
# Check for private ranges
if first_octet == 10 : # 10.0.0.0/8
return False
if first_octet == 192 and len ( parts ) > 1 and parts [ 1 ] . isdigit ( ) and int ( parts [ 1 ] ) == 168 : # 192.168.0.0/16
return False
if first_octet == 172 and len ( parts ) > 1 and parts [ 1 ] . isdigit ( ) :
second_octet = int ( parts [ 1 ] )
if 16 < = second_octet < = 31 : # 172.16.0.0/12
return False
return True
except Exception :
return False
@tool (
name = " read_file " ,
description = " Read the contents of a file. Use this to view configuration files, code, or any text file. " ,
input_schema = {
" file_path " : str ,
} ,
)
async def read_file_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Read and return file contents. """
file_path = args [ " file_path " ]
path = Path ( file_path )
if not path . exists ( ) :
return {
" content " : [ { " type " : " text " , " text " : f " Error: File not found: { file_path } " } ] ,
" isError " : True
}
try :
content = path . read_text ( encoding = " utf-8 " )
if len ( content ) > _MAX_TOOL_OUTPUT :
content = content [ : _MAX_TOOL_OUTPUT ] + " \n ... (file truncated) "
return {
" content " : [ { " type " : " text " , " text " : f " Content of { file_path } : \n \n { content } " } ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error reading file: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " write_file " ,
description = " Write content to a file. Creates a new file or overwrites existing file completely. " ,
input_schema = {
" file_path " : str ,
" content " : str ,
} ,
)
async def write_file_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Write content to a file. """
file_path = args [ " file_path " ]
content = args [ " content " ]
path = Path ( file_path )
try :
# Create parent directories if they don't exist
path . parent . mkdir ( parents = True , exist_ok = True )
path . write_text ( content , encoding = " utf-8 " )
return {
" content " : [ {
" type " : " text " ,
" text " : f " Successfully wrote to { file_path } ( { len ( content ) } characters) "
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error writing file: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " edit_file " ,
description = " Edit a file by replacing specific text. Use this to make targeted changes without rewriting the entire file. " ,
input_schema = {
" file_path " : str ,
" old_text " : str ,
" new_text " : str ,
} ,
)
async def edit_file_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Edit file by replacing text. """
file_path = args [ " file_path " ]
old_text = args [ " old_text " ]
new_text = args [ " new_text " ]
path = Path ( file_path )
if not path . exists ( ) :
return {
" content " : [ { " type " : " text " , " text " : f " Error: File not found: { file_path } " } ] ,
" isError " : True
}
try :
content = path . read_text ( encoding = " utf-8 " )
if old_text not in content :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Text not found in file. Could not find: \n { old_text [ : 100 ] } ... "
} ] ,
" isError " : True
}
new_content = content . replace ( old_text , new_text , 1 )
path . write_text ( new_content , encoding = " utf-8 " )
return {
" content " : [ {
" type " : " text " ,
" text " : f " Successfully edited { file_path } . Replaced 1 occurrence. "
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error editing file: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " list_directory " ,
description = " List files and directories in a given path. Useful for exploring the file system. " ,
input_schema = {
" path " : str ,
} ,
)
async def list_directory_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" List directory contents. """
dir_path = Path ( args . get ( " path " , " . " ) )
if not dir_path . exists ( ) :
return {
" content " : [ { " type " : " text " , " text " : f " Error: Directory not found: { dir_path } " } ] ,
" isError " : True
}
if not dir_path . is_dir ( ) :
return {
" content " : [ { " type " : " text " , " text " : f " Error: Not a directory: { dir_path } " } ] ,
" isError " : True
}
try :
items = [ ]
for item in sorted ( dir_path . iterdir ( ) ) :
item_type = " DIR " if item . is_dir ( ) else " FILE "
size = " " if item . is_dir ( ) else f " ( { item . stat ( ) . st_size } bytes) "
items . append ( f " { item_type } { item . name } { size } " )
if not items :
return {
" content " : [ { " type " : " text " , " text " : f " Directory { dir_path } is empty " } ]
}
return {
" content " : [ {
" type " : " text " ,
" text " : f " Contents of { dir_path } : \n " + " \n " . join ( items )
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error listing directory: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " run_command " ,
description = " Execute a shell command. Use for git operations, running scripts, installing packages, etc. " ,
input_schema = {
" command " : str ,
" working_dir " : str ,
} ,
)
async def run_command_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Execute a shell command. """
command = args [ " command " ]
working_dir = args . get ( " working_dir " , " . " )
try :
result = subprocess . run (
command ,
shell = True ,
cwd = working_dir ,
capture_output = True ,
text = True ,
timeout = 30 ,
)
output = [ ]
if result . stdout :
stdout = result . stdout
if len ( stdout ) > _MAX_TOOL_OUTPUT :
stdout = stdout [ : _MAX_TOOL_OUTPUT ] + " \n ... (stdout truncated) "
output . append ( f " STDOUT: \n { stdout } " )
if result . stderr :
stderr = result . stderr
if len ( stderr ) > _MAX_TOOL_OUTPUT :
stderr = stderr [ : _MAX_TOOL_OUTPUT ] + " \n ... (stderr truncated) "
output . append ( f " STDERR: \n { stderr } " )
status = f " Command exited with code { result . returncode } "
if not output :
text = status
else :
text = status + " \n \n " + " \n \n " . join ( output )
return {
" content " : [ { " type " : " text " , " text " : text } ] ,
" isError " : result . returncode != 0
}
except subprocess . TimeoutExpired :
return {
" content " : [ { " type " : " text " , " text " : " Error: Command timed out after 30 seconds " } ] ,
" isError " : True
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error running command: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " web_fetch " ,
description = " Fetch and parse content from a web page. Returns the page text content for analysis. Use this to get real-time information from the web. " ,
input_schema = {
" url " : str ,
} ,
)
async def web_fetch_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Fetch webpage and return parsed text content.
This is a zero - cost MCP tool - it fetches the HTML and converts to text ,
then returns it to the main Agent SDK query for processing ( no extra API cost ) .
"""
url = args [ " url " ]
# Security: Validate URL
if not _is_safe_url ( url ) :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Blocked unsafe URL - { url } \n \n Only http/https URLs to public domains are allowed. "
} ] ,
" isError " : True
}
try :
# Fetch page with timeout and size limit
headers = {
" User-Agent " : " Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code) "
}
async with httpx . AsyncClient (
timeout = 10.0 ,
follow_redirects = True ,
limits = httpx . Limits ( max_connections = 5 ) ,
headers = headers
) as client :
response = await client . get ( url )
response . raise_for_status ( )
# Check size limit
if len ( response . content ) > _MAX_WEB_PAGE_SIZE :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Page too large ( { len ( response . content ) } bytes, max { _MAX_WEB_PAGE_SIZE } ) "
} ] ,
" isError " : True
}
# Parse HTML to text
soup = BeautifulSoup ( response . content , ' html.parser ' )
# Remove unwanted elements
for element in soup ( [ ' script ' , ' style ' , ' nav ' , ' footer ' , ' header ' , ' aside ' , ' form ' ] ) :
element . decompose ( )
# Extract text
text = soup . get_text ( separator = ' \n ' , strip = True )
# Clean up excessive whitespace
lines = [ line . strip ( ) for line in text . split ( ' \n ' ) if line . strip ( ) ]
text = ' \n ' . join ( lines )
# Truncate if needed
if len ( text ) > _MAX_WEB_TEXT :
text = text [ : _MAX_WEB_TEXT ] + " \n \n ... (content truncated, page is very long) "
# Get title if available
title = soup . title . string if soup . title else " No title "
return {
" content " : [ {
" type " : " text " ,
" text " : f " Fetched: { response . url } \n Title: { title } \n \n { text } "
} ]
}
except httpx . TimeoutException :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Request to { url } timed out after 10 seconds "
} ] ,
" isError " : True
}
except httpx . HTTPStatusError as e :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: HTTP { e . response . status_code } - { url } "
} ] ,
" isError " : True
}
except Exception as e :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error fetching webpage: { str ( e ) } "
} ] ,
" isError " : True
}
@tool (
name = " fleeting_note " ,
description = " Quickly capture a thought or idea as a fleeting note in your zettelkasten. Use this for quick captures that can be processed later into permanent notes. " ,
input_schema = {
" content " : str ,
" tags " : str , # Comma-separated tags (optional)
} ,
)
async def fleeting_note_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Create a fleeting note (quick capture) in the zettelkasten vault.
Zero - cost MCP tool for instant thought capture . Perfect for ADHD - friendly quick notes .
"""
content = args [ " content " ]
tags_str = args . get ( " tags " , " " )
try :
_ensure_vault_structure ( )
# Generate note ID and extract title from first line
note_id = _generate_note_id ( )
lines = content . split ( ' \n ' , 1 )
title = lines [ 0 ] [ : 50 ] if lines else " Quick Note "
# Parse tags
tags = [ tag . strip ( ) for tag in tags_str . split ( ' , ' ) ] if tags_str else [ ]
tags . append ( " fleeting " ) # Always tag as fleeting
# Create note file
filename = f " { note_id } - { title . replace ( ' : ' , ' ' ) . replace ( ' / ' , ' - ' ) } .md "
note_path = _VAULT_FLEETING / filename
# Write note with front matter
frontmatter = _create_frontmatter ( note_id , title , tags , " fleeting " )
full_content = frontmatter + content
note_path . write_text ( full_content , encoding = " utf-8 " )
return {
" content " : [ {
" type " : " text " ,
" text " : f " Created fleeting note: { filename } \n ID: { note_id } \n Location: { note_path } "
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error creating fleeting note: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " daily_note " ,
description = " Add an entry to today ' s daily note. Use this for journaling, logging activities, or tracking daily thoughts. " ,
input_schema = {
" entry " : str ,
} ,
)
async def daily_note_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Append timestamped entry to today ' s daily note.
Zero - cost MCP tool for daily journaling and activity logging .
"""
entry = args [ " entry " ]
try :
_ensure_vault_structure ( )
# Get today's date
now = datetime . now ( )
date_str = now . strftime ( " % Y- % m- %d " )
time_str = now . strftime ( " % H: % M " )
# Daily note path
note_path = _VAULT_DAILY / f " { date_str } .md "
# Create or update daily note
if note_path . exists ( ) :
# Append to existing note
current_content = note_path . read_text ( encoding = " utf-8 " )
new_entry = f " \n ## { time_str } \n { entry } \n "
updated_content = current_content + new_entry
else :
# Create new daily note with front matter
frontmatter = f """ ---
date : { date_str }
type : daily
tags : [ daily - note ]
- - -
# {date_str}
## {time_str}
{ entry }
"""
updated_content = frontmatter
note_path . write_text ( updated_content , encoding = " utf-8 " )
return {
" content " : [ {
" type " : " text " ,
" text " : f " Added entry to daily note: { date_str } \n Time: { time_str } \n Location: { note_path } "
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error updating daily note: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " literature_note " ,
description = " Create a literature note from a web article. Fetches the article, extracts key points, and creates a properly formatted zettelkasten note with source citation. " ,
input_schema = {
" url " : str ,
" tags " : str , # Comma-separated tags (optional)
} ,
)
async def literature_note_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Create literature note from web article.
Zero - cost MCP tool . Combines web_fetch with note creation .
"""
url = args [ " url " ]
tags_str = args . get ( " tags " , " " )
try :
_ensure_vault_structure ( )
# Fetch article content using web_fetch logic
if not _is_safe_url ( url ) :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Blocked unsafe URL - { url } "
} ] ,
" isError " : True
}
# Fetch the article
headers = {
" User-Agent " : " Garvis/1.0 (Personal Assistant Bot; +https://github.com/anthropics/claude-code) "
}
async with httpx . AsyncClient ( timeout = 10.0 , follow_redirects = True , headers = headers ) as client :
response = await client . get ( url )
response . raise_for_status ( )
if len ( response . content ) > _MAX_WEB_PAGE_SIZE :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error: Page too large "
} ] ,
" isError " : True
}
# Parse content
soup = BeautifulSoup ( response . content , ' html.parser ' )
for element in soup ( [ ' script ' , ' style ' , ' nav ' , ' footer ' , ' header ' , ' aside ' , ' form ' ] ) :
element . decompose ( )
text = soup . get_text ( separator = ' \n ' , strip = True )
lines = [ line . strip ( ) for line in text . split ( ' \n ' ) if line . strip ( ) ]
text = ' \n ' . join ( lines ) [ : _MAX_WEB_TEXT ]
title = soup . title . string if soup . title else " Untitled Article "
# Generate note
note_id = _generate_note_id ( )
tags = [ tag . strip ( ) for tag in tags_str . split ( ' , ' ) ] if tags_str else [ ]
tags . extend ( [ " literature " , " web-article " ] )
# Create note content
note_content = f """ # { title }
* * Source * * : { url }
* * Captured * * : { datetime . now ( ) . strftime ( " % Y- % m- %d " ) }
## Content
{ text }
## Notes
( Add your thoughts , key takeaways , and connections to other notes here )
## Related
-
- - -
* This is a literature note . Process it into permanent notes with key insights . *
"""
# Create filename and path
filename = f " { note_id } - { title [ : 50 ] . replace ( ' : ' , ' ' ) . replace ( ' / ' , ' - ' ) } .md "
note_path = _VAULT_LITERATURE / filename
# Write with frontmatter
frontmatter = _create_frontmatter ( note_id , title , tags , " literature " )
full_content = frontmatter + note_content
note_path . write_text ( full_content , encoding = " utf-8 " )
return {
" content " : [ {
" type " : " text " ,
" text " : f " Created literature note from article \n Title: { title } \n ID: { note_id } \n Location: { note_path } "
} ]
}
except httpx . TimeoutException :
return {
" content " : [ { " type " : " text " , " text " : f " Error: Request timed out " } ] ,
" isError " : True
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error creating literature note: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " permanent_note " ,
description = " Create a permanent note with automatic link suggestions to related notes. Use this for refined, well-thought-out notes that form the core of your knowledge base. " ,
input_schema = {
" title " : str ,
" content " : str ,
" tags " : str , # Comma-separated tags (optional)
} ,
)
async def permanent_note_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Create permanent note with smart linking suggestions.
Zero - cost MCP tool . Uses simple search for now .
Future : Will use hybrid search for better suggestions .
"""
title = args [ " title " ]
content = args [ " content " ]
tags_str = args . get ( " tags " , " " )
try :
_ensure_vault_structure ( )
# Generate note ID
note_id = _generate_note_id ( )
# Parse tags
tags = [ tag . strip ( ) for tag in tags_str . split ( ' , ' ) ] if tags_str else [ ]
tags . append ( " permanent " )
# Search for related notes using hybrid search (vector + keyword)
related_notes = _find_related_notes_hybrid ( title , content , max_results = 5 )
# Build related section
related_section = " \n ## Related Notes \n "
if related_notes :
for note_title , _ in related_notes :
related_section + = f " - [[ { note_title } ]] \n "
else :
related_section + = " - (No related notes found yet) \n "
# Create full note content
note_content = f """ # { title }
{ content }
{ related_section }
- - -
* Created : { datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M " ) } *
"""
# Create filename
filename = f " { note_id } - { title [ : 50 ] . replace ( ' : ' , ' ' ) . replace ( ' / ' , ' - ' ) } .md "
note_path = _VAULT_PERMANENT / filename
# Write with frontmatter
frontmatter = _create_frontmatter ( note_id , title , tags , " permanent " )
full_content = frontmatter + note_content
note_path . write_text ( full_content , encoding = " utf-8 " )
# Build result message
result_msg = f " Created permanent note: { title } \n ID: { note_id } \n Location: { note_path } "
if related_notes :
result_msg + = f " \n \n Suggested links ( { len ( related_notes ) } related notes): "
for note_title , matches in related_notes :
result_msg + = f " \n - [[ { note_title } ]] ( { matches } keyword matches) "
return {
" content " : [ {
" type " : " text " ,
" text " : result_msg
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error creating permanent note: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " search_by_tags " ,
description = " Search zettelkasten vault by tags. Find all notes with specific tags or tag combinations. " ,
input_schema = {
" tags " : str , # Comma-separated tags to search for
" match_all " : bool , # If true, note must have ALL tags. If false, ANY tag matches (optional, default false)
" limit " : int , # Max results (optional, default 10)
} ,
)
async def search_by_tags_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Search vault by tags.
Zero - cost MCP tool . Searches YAML frontmatter for tag matches .
"""
tags_str = args [ " tags " ]
match_all = args . get ( " match_all " , False )
limit = args . get ( " limit " , 10 )
try :
_ensure_vault_structure ( )
# Parse search tags
search_tags = [ tag . strip ( ) . lower ( ) for tag in tags_str . split ( ' , ' ) ]
results = [ ]
for vault_dir in [ _VAULT_FLEETING , _VAULT_DAILY , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
if not vault_dir . exists ( ) :
continue
for note_path in vault_dir . glob ( " *.md " ) :
try :
content = note_path . read_text ( encoding = " utf-8 " )
# Extract tags from frontmatter
if content . startswith ( " --- " ) :
parts = content . split ( " --- " , 2 )
if len ( parts ) > = 2 :
frontmatter = parts [ 1 ]
# Look for tags line
for line in frontmatter . split ( ' \n ' ) :
if line . strip ( ) . startswith ( ' tags: ' ) :
tags_part = line . split ( ' : ' , 1 ) [ 1 ] . strip ( )
# Remove brackets and split
tags_part = tags_part . strip ( ' [] ' )
note_tags = [ t . strip ( ) . lower ( ) for t in tags_part . split ( ' , ' ) ]
# Check match
if match_all :
if all ( tag in note_tags for tag in search_tags ) :
results . append ( note_path . name )
else :
if any ( tag in note_tags for tag in search_tags ) :
results . append ( note_path . name )
break
except Exception :
continue
# Limit results
results = results [ : limit ]
if not results :
match_type = " all " if match_all else " any "
return {
" content " : [ {
" type " : " text " ,
" text " : f " No notes found with { match_type } of tags: { tags_str } "
} ]
}
result_text = f " Found { len ( results ) } note(s) with tags ' { tags_str } ' : \n \n "
for i , note_name in enumerate ( results , 1 ) :
result_text + = f " { i } . { note_name } \n "
return {
" content " : [ {
" type " : " text " ,
" text " : result_text
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error searching by tags: { str ( e ) } " } ] ,
" isError " : True
}
@tool (
name = " search_vault " ,
description = " Search your zettelkasten vault for notes matching a query. Returns relevant notes with context. Optionally filter by tags. " ,
input_schema = {
" query " : str ,
" tags " : str , # Optional comma-separated tags to filter by
" limit " : int , # Max results (optional, default 5)
} ,
)
async def search_vault_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Search zettelkasten vault using hybrid search (vector + keyword).
Zero - cost MCP tool . Uses hybrid search when available for better results .
"""
query = args [ " query " ]
tags_filter = args . get ( " tags " , " " )
limit = args . get ( " limit " , 5 )
try :
_ensure_vault_structure ( )
# Try hybrid search first
memory = _get_memory_system ( )
results = [ ]
if memory and MEMORY_AVAILABLE :
try :
# Index vault
for vault_dir in [ _VAULT_FLEETING , _VAULT_DAILY , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
if vault_dir . exists ( ) :
for note_path in vault_dir . glob ( " *.md " ) :
memory . index_file ( note_path )
# Hybrid search
search_results = memory . search_hybrid ( query , max_results = limit * 2 )
# Convert to results format
for result in search_results :
note_path = Path ( result [ " path " ] )
results . append ( {
" file " : note_path . name ,
" path " : str ( note_path ) ,
" snippet " : result . get ( " snippet " , " " ) [ : 300 ]
} )
except Exception :
pass # Fall back to simple search
# Fallback: simple keyword search
if not results :
query_lower = query . lower ( )
for vault_dir in [ _VAULT_FLEETING , _VAULT_DAILY , _VAULT_PERMANENT , _VAULT_LITERATURE ] :
if not vault_dir . exists ( ) :
continue
for note_path in vault_dir . glob ( " *.md " ) :
try :
content = note_path . read_text ( encoding = " utf-8 " )
if query_lower in content . lower ( ) :
# Extract snippet
lines = content . split ( ' \n ' )
for i , line in enumerate ( lines ) :
if query_lower in line . lower ( ) :
start = max ( 0 , i - 2 )
end = min ( len ( lines ) , i + 3 )
snippet = ' \n ' . join ( lines [ start : end ] )
results . append ( {
" file " : note_path . name ,
" path " : str ( note_path ) ,
" snippet " : snippet [ : 300 ]
} )
break
except Exception :
continue
# Filter by tags if specified
if tags_filter :
filter_tags = [ t . strip ( ) . lower ( ) for t in tags_filter . split ( ' , ' ) ]
filtered_results = [ ]
for result in results :
try :
note_path = Path ( result [ " path " ] )
content = note_path . read_text ( encoding = " utf-8 " )
# Check tags in frontmatter
if content . startswith ( " --- " ) :
parts = content . split ( " --- " , 2 )
if len ( parts ) > = 2 :
frontmatter = parts [ 1 ]
for line in frontmatter . split ( ' \n ' ) :
if line . strip ( ) . startswith ( ' tags: ' ) :
tags_part = line . split ( ' : ' , 1 ) [ 1 ] . strip ( ) . strip ( ' [] ' )
note_tags = [ t . strip ( ) . lower ( ) for t in tags_part . split ( ' , ' ) ]
if any ( tag in note_tags for tag in filter_tags ) :
filtered_results . append ( result )
break
except Exception :
continue
results = filtered_results
# Limit results
results = results [ : limit ]
if not results :
tag_msg = f " with tags ' { tags_filter } ' " if tags_filter else " "
return {
" content " : [ {
" type " : " text " ,
" text " : f " No notes found matching: { query } { tag_msg } "
} ]
}
# Format results
tag_msg = f " (filtered by tags: { tags_filter } ) " if tags_filter else " "
result_text = f " Found { len ( results ) } note(s) matching ' { query } ' { tag_msg } : \n \n "
for i , result in enumerate ( results , 1 ) :
result_text + = f " { i } . { result [ ' file ' ] } \n "
result_text + = f " { result [ ' snippet ' ] } \n \n "
return {
" content " : [ {
" type " : " text " ,
" text " : result_text
} ]
}
except Exception as e :
return {
" content " : [ { " type " : " text " , " text " : f " Error searching vault: { str ( e ) } " } ] ,
" isError " : True
}
2026-02-18 20:31:32 -07:00
# ============================================
# Google and Weather Tools (MCP Migration)
# ============================================
# Lazy-loaded Google clients
_gmail_client : Optional [ Any ] = None
_calendar_client : Optional [ Any ] = None
_people_client : Optional [ Any ] = None
def _initialize_google_clients ( ) :
""" Lazy-load Google API clients when needed. """
global _gmail_client , _calendar_client , _people_client
if _gmail_client is not None :
return _gmail_client , _calendar_client , _people_client
try :
from google_tools . gmail_client import GmailClient
from google_tools . calendar_client import CalendarClient
from google_tools . people_client import PeopleClient
from google_tools . oauth_manager import GoogleOAuthManager
oauth_manager = GoogleOAuthManager ( )
credentials = oauth_manager . get_credentials ( )
if not credentials :
return None , None , None
_gmail_client = GmailClient ( oauth_manager )
_calendar_client = CalendarClient ( oauth_manager )
_people_client = PeopleClient ( oauth_manager )
return _gmail_client , _calendar_client , _people_client
except Exception as e :
print ( f " [MCP Google] Failed to initialize: { e } " )
return None , None , None
@tool (
name = " get_weather " ,
description = " Get current weather for a location using OpenWeatherMap API. Returns temperature, conditions, and description. " ,
input_schema = { " location " : str } ,
)
async def get_weather ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Get current weather for a location using OpenWeatherMap API. """
location = args . get ( " location " , " Phoenix, US " )
import os
import requests
api_key = os . getenv ( " OPENWEATHERMAP_API_KEY " )
if not api_key :
return {
" content " : [ {
" type " : " text " ,
" text " : " Error: OPENWEATHERMAP_API_KEY not found in environment variables "
} ] ,
" isError " : True
}
try :
base_url = " http://api.openweathermap.org/data/2.5/weather "
params = {
" q " : location ,
" appid " : api_key ,
" units " : " imperial "
}
response = requests . get ( base_url , params = params , timeout = 10 )
response . raise_for_status ( )
data = response . json ( )
temp = data [ " main " ] [ " temp " ]
feels_like = data [ " main " ] [ " feels_like " ]
humidity = data [ " main " ] [ " humidity " ]
conditions = data [ " weather " ] [ 0 ] [ " main " ]
description = data [ " weather " ] [ 0 ] [ " description " ]
city_name = data [ " name " ]
summary = (
f " Weather in { city_name } : \n "
f " Temperature: { temp } °F (feels like { feels_like } °F) \n "
f " Conditions: { conditions } - { description } \n "
f " Humidity: { humidity } % "
)
return {
" content " : [ { " type " : " text " , " text " : summary } ]
}
except Exception as e :
return {
" content " : [ {
" type " : " text " ,
" text " : f " Error getting weather: { str ( e ) } "
} ] ,
" isError " : True
}
@tool (
name = " send_email " ,
description = " Send an email via Gmail API. Requires prior OAuth setup (--setup-google). " ,
input_schema = { " to " : str , " subject " : str , " body " : str , " cc " : str , " reply_to_message_id " : str } ,
)
async def send_email ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Send an email via Gmail API. """
to = args [ " to " ]
subject = args [ " subject " ]
body = args [ " body " ]
cc = args . get ( " cc " )
reply_to_message_id = args . get ( " reply_to_message_id " )
gmail_client , _ , _ = _initialize_google_clients ( )
if not gmail_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = gmail_client . send_email (
to = to , subject = subject , body = body , cc = cc ,
reply_to_message_id = reply_to_message_id ,
)
if result [ " success " ] :
msg_id = result . get ( " message_id " , " unknown " )
text = f " Email sent successfully to { to } \n Message ID: { msg_id } \n Subject: { subject } "
else :
text = f " Error sending email: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " read_emails " ,
description = " Search and read emails from Gmail using Gmail query syntax (e.g., ' from:user@example.com after:2026/02/10 ' ). " ,
input_schema = { " query " : str , " max_results " : int , " include_body " : bool } ,
)
async def read_emails ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Search and read emails from Gmail. """
query = args . get ( " query " , " " )
max_results = args . get ( " max_results " , 10 )
include_body = args . get ( " include_body " , False )
gmail_client , _ , _ = _initialize_google_clients ( )
if not gmail_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = gmail_client . search_emails ( query = query , max_results = max_results , include_body = include_body )
if result [ " success " ] :
summary = result . get ( " summary " , " " )
if len ( summary ) > _MAX_TOOL_OUTPUT :
summary = summary [ : _MAX_TOOL_OUTPUT ] + " \n ... (results truncated) "
else :
summary = f " Error reading emails: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : summary } ] , " isError " : not result [ " success " ] }
@tool (
name = " get_email " ,
description = " Get full content of a specific email by its Gmail message ID. " ,
input_schema = { " message_id " : str , " format_type " : str } ,
)
async def get_email ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Get full content of a specific email. """
message_id = args [ " message_id " ]
format_type = args . get ( " format_type " , " text " )
gmail_client , _ , _ = _initialize_google_clients ( )
if not gmail_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = gmail_client . get_email ( message_id = message_id , format_type = format_type )
if result [ " success " ] :
email_data = result . get ( " email " , { } )
text = (
f " From: { email_data . get ( ' from ' , ' Unknown ' ) } \n "
f " To: { email_data . get ( ' to ' , ' Unknown ' ) } \n "
f " Subject: { email_data . get ( ' subject ' , ' No subject ' ) } \n "
f " Date: { email_data . get ( ' date ' , ' Unknown ' ) } \n \n "
f " { email_data . get ( ' body ' , ' No content ' ) } "
)
if len ( text ) > _MAX_TOOL_OUTPUT :
text = text [ : _MAX_TOOL_OUTPUT ] + " \n ... (content truncated) "
else :
text = f " Error getting email: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " read_calendar " ,
description = " Read upcoming events from Google Calendar. Shows events from today onwards. " ,
input_schema = { " days_ahead " : int , " calendar_id " : str , " max_results " : int } ,
)
async def read_calendar ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Read upcoming calendar events. """
days_ahead = args . get ( " days_ahead " , 7 )
calendar_id = args . get ( " calendar_id " , " primary " )
max_results = args . get ( " max_results " , 20 )
_ , calendar_client , _ = _initialize_google_clients ( )
if not calendar_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = calendar_client . list_events (
days_ahead = days_ahead , calendar_id = calendar_id , max_results = max_results ,
)
if result [ " success " ] :
summary = result . get ( " summary " , " No events found " )
if len ( summary ) > _MAX_TOOL_OUTPUT :
summary = summary [ : _MAX_TOOL_OUTPUT ] + " \n ... (results truncated) "
text = f " Upcoming events (next { days_ahead } days): \n \n { summary } "
else :
text = f " Error reading calendar: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " create_calendar_event " ,
description = " Create a new event in Google Calendar. Use ISO 8601 format for times. " ,
input_schema = {
" summary " : str , " start_time " : str , " end_time " : str ,
" description " : str , " location " : str , " calendar_id " : str ,
} ,
)
async def create_calendar_event ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Create a new calendar event. """
summary = args [ " summary " ]
start_time = args [ " start_time " ]
end_time = args [ " end_time " ]
description = args . get ( " description " , " " )
location = args . get ( " location " , " " )
calendar_id = args . get ( " calendar_id " , " primary " )
_ , calendar_client , _ = _initialize_google_clients ( )
if not calendar_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = calendar_client . create_event (
summary = summary , start_time = start_time , end_time = end_time ,
description = description , location = location , calendar_id = calendar_id ,
)
if result [ " success " ] :
event_id = result . get ( " event_id " , " unknown " )
html_link = result . get ( " html_link " , " " )
start = result . get ( " start " , start_time )
text = (
f " Calendar event created successfully! \n "
f " Title: { summary } \n Start: { start } \n "
f " Event ID: { event_id } \n Link: { html_link } "
)
else :
text = f " Error creating calendar event: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " search_calendar " ,
description = " Search calendar events by text query. Searches event titles and descriptions. " ,
input_schema = { " query " : str , " calendar_id " : str } ,
)
async def search_calendar ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Search calendar events by text query. """
query = args [ " query " ]
calendar_id = args . get ( " calendar_id " , " primary " )
_ , calendar_client , _ = _initialize_google_clients ( )
if not calendar_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = calendar_client . search_events ( query = query , calendar_id = calendar_id )
if result [ " success " ] :
summary = result . get ( " summary " , " No events found " )
if len ( summary ) > _MAX_TOOL_OUTPUT :
summary = summary [ : _MAX_TOOL_OUTPUT ] + " \n ... (results truncated) "
text = f " Calendar search results for ' { query } ' : \n \n { summary } "
else :
text = f " Error searching calendar: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
# ============================================
# Contacts Tools (MCP)
# ============================================
@tool (
name = " create_contact " ,
description = " Create a new Google contact. Requires prior OAuth setup (--setup-google). " ,
input_schema = { " given_name " : str , " family_name " : str , " email " : str , " phone " : str , " notes " : str } ,
)
async def create_contact ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Create a new Google contact. """
given_name = args [ " given_name " ]
family_name = args . get ( " family_name " , " " )
email = args . get ( " email " , " " )
phone = args . get ( " phone " )
notes = args . get ( " notes " )
_ , _ , people_client = _initialize_google_clients ( )
if not people_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = people_client . create_contact (
given_name = given_name , family_name = family_name ,
email = email , phone = phone , notes = notes ,
)
if result [ " success " ] :
name = result . get ( " name " , given_name )
resource = result . get ( " resource_name " , " " )
text = f " Contact created: { name } \n Resource: { resource } "
else :
text = f " Error creating contact: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " list_contacts " ,
description = " List or search Google contacts. Without a query, lists all contacts sorted by last name. " ,
input_schema = { " max_results " : int , " query " : str } ,
)
async def list_contacts ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" List or search Google contacts. """
max_results = args . get ( " max_results " , 100 )
query = args . get ( " query " )
_ , _ , people_client = _initialize_google_clients ( )
if not people_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = people_client . list_contacts ( max_results = max_results , query = query )
if result [ " success " ] :
summary = result . get ( " summary " , " No contacts found. " )
if len ( summary ) > _MAX_TOOL_OUTPUT :
summary = summary [ : _MAX_TOOL_OUTPUT ] + " \n ... (results truncated) "
text = f " Contacts ( { result . get ( ' count ' , 0 ) } found): \n \n { summary } "
else :
text = f " Error listing contacts: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
@tool (
name = " get_contact " ,
description = " Get full details of a specific Google contact by resource name. " ,
input_schema = { " resource_name " : str } ,
)
async def get_contact ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Get full details of a specific Google contact. """
resource_name = args [ " resource_name " ]
_ , _ , people_client = _initialize_google_clients ( )
if not people_client :
return {
" content " : [ { " type " : " text " , " text " : " Error: Google not authorized. Run: python bot_runner.py --setup-google " } ] ,
" isError " : True
}
result = people_client . get_contact ( resource_name = resource_name )
if result [ " success " ] :
c = result . get ( " contact " , { } )
output = [ ]
name = c . get ( " display_name " ) or f " { c . get ( ' given_name ' , ' ' ) } { c . get ( ' family_name ' , ' ' ) } " . strip ( )
output . append ( f " Name: { name or ' (no name) ' } " )
if c . get ( " email " ) :
output . append ( f " Email: { c [ ' email ' ] } " )
if c . get ( " phone " ) :
output . append ( f " Phone: { c [ ' phone ' ] } " )
if c . get ( " notes " ) :
output . append ( f " Notes: { c [ ' notes ' ] } " )
output . append ( f " Resource: { c . get ( ' resource_name ' , resource_name ) } " )
text = " \n " . join ( output )
else :
text = f " Error getting contact: { result . get ( ' error ' , ' Unknown error ' ) } "
return { " content " : [ { " type " : " text " , " text " : text } ] , " isError " : not result [ " success " ] }
# ============================================
# Gitea Tools (MCP) - Private repo access
# ============================================
# Lazy-loaded Gitea client
_gitea_client : Optional [ Any ] = None
def _get_gitea_client ( ) :
""" Lazy-load Gitea client when first needed. """
global _gitea_client
if _gitea_client is not None :
return _gitea_client
try :
from gitea_tools . client import get_gitea_client
_gitea_client = get_gitea_client ( )
return _gitea_client
except Exception as e :
print ( f " [MCP Gitea] Failed to initialize: { e } " )
return None
@tool (
name = " gitea_read_file " ,
description = " Read a file from a Gitea repository. Use this to access files from Jordan ' s homelab repo or any configured Gitea repo. Returns the file content as text. " ,
input_schema = {
" file_path " : str ,
" repo " : str ,
" branch " : str ,
} ,
)
async def gitea_read_file_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Read a file from a Gitea repository.
Zero - cost MCP tool for accessing private Gitea repos .
"""
file_path = args . get ( " file_path " , " " )
repo = args . get ( " repo " )
branch = args . get ( " branch " )
if not file_path :
return {
" content " : [ { " type " : " text " , " text " : " Error: file_path is required " } ] ,
" isError " : True ,
}
client = _get_gitea_client ( )
if not client :
return {
" content " : [ {
" type " : " text " ,
" text " : (
" Error: Gitea not configured. "
" Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
" and add your Personal Access Token. "
) ,
} ] ,
" isError " : True ,
}
# Parse owner/repo if provided
owner = None
if repo and " / " in repo :
parts = repo . split ( " / " , 1 )
owner = parts [ 0 ]
repo = parts [ 1 ]
result = await client . get_file_content (
file_path = file_path ,
owner = owner ,
repo = repo ,
branch = branch ,
)
if result [ " success " ] :
content = result [ " content " ]
metadata = result . get ( " metadata " , { } )
path_info = metadata . get ( " path " , file_path )
size = metadata . get ( " size " , 0 )
header = f " File: { path_info } ( { size : , } bytes) "
if metadata . get ( " truncated " ) :
header + = " [TRUNCATED] "
return {
" content " : [ { " type " : " text " , " text " : f " { header } \n \n { content } " } ] ,
}
else :
return {
" content " : [ { " type " : " text " , " text " : f " Error: { result [ ' error ' ] } " } ] ,
" isError " : True ,
}
@tool (
name = " gitea_list_files " ,
description = " List files and folders in a directory in a Gitea repository. Use this to explore the structure of Jordan ' s homelab repo or any configured Gitea repo. " ,
input_schema = {
" path " : str ,
" repo " : str ,
" branch " : str ,
} ,
)
async def gitea_list_files_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" List files and directories in a Gitea repo path.
Zero - cost MCP tool for browsing private Gitea repos .
"""
path = args . get ( " path " , " " )
repo = args . get ( " repo " )
branch = args . get ( " branch " )
client = _get_gitea_client ( )
if not client :
return {
" content " : [ {
" type " : " text " ,
" text " : (
" Error: Gitea not configured. "
" Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
" and add your Personal Access Token. "
) ,
} ] ,
" isError " : True ,
}
# Parse owner/repo if provided
owner = None
if repo and " / " in repo :
parts = repo . split ( " / " , 1 )
owner = parts [ 0 ]
repo = parts [ 1 ]
result = await client . list_files (
path = path ,
owner = owner ,
repo = repo ,
branch = branch ,
)
if result [ " success " ] :
files = result [ " files " ]
repo_name = result . get ( " repo " , " " )
display_path = result . get ( " path " , " / " )
count = result . get ( " count " , 0 )
# Format output
lines = [ f " Directory: { repo_name } / { display_path } ( { count } items) \n " ]
for f in files :
if f [ " type " ] == " dir " :
lines . append ( f " DIR { f [ ' name ' ] } / " )
else :
size_str = f " ( { f [ ' size ' ] : , } bytes) " if f [ " size " ] else " "
lines . append ( f " FILE { f [ ' name ' ] } { size_str } " )
return {
" content " : [ { " type " : " text " , " text " : " \n " . join ( lines ) } ] ,
}
else :
return {
" content " : [ { " type " : " text " , " text " : f " Error: { result [ ' error ' ] } " } ] ,
" isError " : True ,
}
@tool (
name = " gitea_search_code " ,
description = " Search for files by name/path in a Gitea repository. Searches file and directory names. For content search, use gitea_read_file on specific files. " ,
input_schema = {
" query " : str ,
" repo " : str ,
} ,
)
async def gitea_search_code_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Search for code/files in a Gitea repository.
Zero - cost MCP tool . Searches file / directory names in the repo tree .
"""
query = args . get ( " query " , " " )
repo = args . get ( " repo " )
if not query :
return {
" content " : [ { " type " : " text " , " text " : " Error: query is required " } ] ,
" isError " : True ,
}
client = _get_gitea_client ( )
if not client :
return {
" content " : [ {
" type " : " text " ,
" text " : (
" Error: Gitea not configured. "
" Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
" and add your Personal Access Token. "
) ,
} ] ,
" isError " : True ,
}
# Parse owner/repo if provided
owner = None
if repo and " / " in repo :
parts = repo . split ( " / " , 1 )
owner = parts [ 0 ]
repo = parts [ 1 ]
result = await client . search_code (
query = query ,
owner = owner ,
repo = repo ,
)
if result [ " success " ] :
results = result . get ( " results " , [ ] )
count = result . get ( " count " , 0 )
repo_name = result . get ( " repo " , " " )
if not results :
message = result . get ( " message " , f " No results for ' { query } ' " )
return {
" content " : [ { " type " : " text " , " text " : message } ] ,
}
lines = [ f " Search results for ' { query } ' in { repo_name } ( { count } matches): \n " ]
for r in results :
type_icon = " DIR " if r [ " type " ] == " dir " else " FILE "
size_str = f " ( { r [ ' size ' ] : , } bytes) " if r . get ( " size " ) else " "
lines . append ( f " { type_icon } { r [ ' path ' ] } { size_str } " )
return {
" content " : [ { " type " : " text " , " text " : " \n " . join ( lines ) } ] ,
}
else :
return {
" content " : [ { " type " : " text " , " text " : f " Error: { result [ ' error ' ] } " } ] ,
" isError " : True ,
}
@tool (
name = " gitea_get_tree " ,
description = " Get the directory tree structure from a Gitea repository. Shows all files and folders. Use recursive=true for the full tree. " ,
input_schema = {
" repo " : str ,
" branch " : str ,
" recursive " : bool ,
} ,
)
async def gitea_get_tree_tool ( args : Dict [ str , Any ] ) - > Dict [ str , Any ] :
""" Get directory tree from a Gitea repository.
Zero - cost MCP tool for viewing repo structure .
"""
repo = args . get ( " repo " )
branch = args . get ( " branch " )
recursive = args . get ( " recursive " , False )
client = _get_gitea_client ( )
if not client :
return {
" content " : [ {
" type " : " text " ,
" text " : (
" Error: Gitea not configured. "
" Copy config/gitea_config.example.yaml to config/gitea_config.yaml "
" and add your Personal Access Token. "
) ,
} ] ,
" isError " : True ,
}
# Parse owner/repo if provided
owner = None
if repo and " / " in repo :
parts = repo . split ( " / " , 1 )
owner = parts [ 0 ]
repo = parts [ 1 ]
result = await client . get_tree (
owner = owner ,
repo = repo ,
branch = branch ,
recursive = recursive ,
)
if result [ " success " ] :
entries = result . get ( " entries " , [ ] )
repo_name = result . get ( " repo " , " " )
branch_name = result . get ( " branch " , " main " )
total = result . get ( " total " , 0 )
truncated = result . get ( " truncated " , False )
lines = [ f " Tree: { repo_name } (branch: { branch_name } , { total } entries) " ]
if truncated :
lines [ 0 ] + = " [TRUNCATED - tree too large] "
lines . append ( " " )
for entry in entries :
if entry [ " type " ] == " dir " :
lines . append ( f " { entry [ ' path ' ] } / " )
else :
size_str = f " ( { entry [ ' size ' ] : , } bytes) " if entry . get ( " size " ) else " "
lines . append ( f " { entry [ ' path ' ] } { size_str } " )
# Truncate output if too long
text = " \n " . join ( lines )
if len ( text ) > _MAX_TOOL_OUTPUT :
text = text [ : _MAX_TOOL_OUTPUT ] + " \n \n ... (tree truncated, use gitea_list_files for specific directories) "
return {
" content " : [ { " type " : " text " , " text " : text } ] ,
}
else :
return {
" content " : [ { " type " : " text " , " text " : f " Error: { result [ ' error ' ] } " } ] ,
" isError " : True ,
}
# Create the MCP server with all tools
2026-02-16 07:43:31 -07:00
file_system_server = create_sdk_mcp_server (
name = " file_system " ,
2026-02-18 20:31:32 -07:00
version = " 2.0.0 " ,
2026-02-16 07:43:31 -07:00
tools = [
2026-02-18 20:31:32 -07:00
# File and system tools
2026-02-16 07:43:31 -07:00
read_file_tool ,
write_file_tool ,
edit_file_tool ,
list_directory_tool ,
run_command_tool ,
2026-02-18 20:31:32 -07:00
# Web tool
2026-02-16 07:43:31 -07:00
web_fetch_tool ,
2026-02-18 20:31:32 -07:00
# Zettelkasten tools
2026-02-16 07:43:31 -07:00
fleeting_note_tool ,
daily_note_tool ,
literature_note_tool ,
permanent_note_tool ,
search_vault_tool ,
search_by_tags_tool ,
2026-02-18 20:31:32 -07:00
# Weather
get_weather ,
# Gmail tools
send_email ,
read_emails ,
get_email ,
# Calendar tools
read_calendar ,
create_calendar_event ,
search_calendar ,
# Contacts tools
create_contact ,
list_contacts ,
get_contact ,
# Gitea tools
gitea_read_file_tool ,
gitea_list_files_tool ,
gitea_search_code_tool ,
gitea_get_tree_tool ,
2026-02-16 07:43:31 -07:00
]
)