Add MCP delegation bridge and diagram tools

**Features Added**:

1. **Agent Registry (agent_registry.py)**
   - Thread-safe global singleton for MCP tool access to Agent instance
   - Enables MCP tools to call Agent.delegate() without circular imports
   - Registered at bot startup in bot_runner.py

2. **Sub-Agent Manager (sub_agent_manager.py)**
   - Watchdog system monitoring sub-agent lifecycle
   - Detects hung agents (5min timeout, 30s check interval)
   - Auto-cleanup and status tracking

3. **delegate_task MCP Tool (mcp_tools.py)**
   - Exposes Agent.delegate() to Claude via MCP protocol
   - Enables parallel sub-agent execution via tool calls
   - Supports specialist prompts and agent ID caching

4. **Memory Write Locks (memory_system.py)**
   - Thread-safe writes to prevent file corruption
   - Protects write_memory(), update_soul(), update_user()

5. **Diagram Tools**
   - Mermaid MCP server (flowcharts, sequence diagrams, etc.)
   - Excalidraw MCP server (hand-drawn style diagrams)
   - Config files in config/ directory

6. **Adapter Improvements**
   - Enhanced error handling across all adapters
   - Unified logging patterns

**Testing**: Ready for parallel sub-agent testing

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 14:34:24 -07:00
parent dd5beb11c2
commit e909cc0044
13 changed files with 1081 additions and 26 deletions

View File

@@ -145,6 +145,26 @@ class BaseAdapter(ABC):
async def send_typing_indicator(self, channel_id: str) -> None:
"""Show typing indicator. Optional."""
async def send_file(
self,
channel_id: str,
file_path: str,
caption: Optional[str] = None,
thread_id: Optional[str] = None
) -> Dict[str, Any]:
"""Send a file attachment to the platform. Optional - override if supported.
Args:
channel_id: Channel/chat ID to send to
file_path: Absolute path to file
caption: Optional caption/text with the file
thread_id: Optional thread/reply ID
Returns:
Dict with success status and message_id or error
"""
return {"success": False, "error": "send_file not implemented"}
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on the adapter."""
return {

View File

@@ -98,6 +98,61 @@ class AdapterRuntime:
print("[Runtime] Warning: No event loop for message dispatch")
self._message_queue.put_nowait(message)
async def _detect_and_send_diagrams(
self,
response: str,
adapter: BaseAdapter,
channel_id: str,
thread_id: Optional[str]
) -> None:
"""Detect diagram file paths in response and auto-send them.
Args:
response: Agent's text response
adapter: Platform adapter to send files with
channel_id: Channel/chat ID
thread_id: Thread/message ID for replies
"""
import re
from pathlib import Path
# Match diagram file paths: "Saved to: path/to/diagram.png"
# Pattern matches common phrases followed by file path with image/diagram extensions
pattern = r"(?:Saved|Created|Generated|Exported|File saved|Output file)\s*(?:to|at)?[:\s]+([^\s]+\.(?:png|svg|pdf|jpg|jpeg))"
matches = re.findall(pattern, response, re.IGNORECASE)
if not matches:
return
sent_files = []
for file_path_str in matches:
try:
file_path = Path(file_path_str)
# Check if file exists
if not file_path.exists():
print(f"[Runtime] Diagram file not found: {file_path}")
continue
# Send file via adapter
result = await adapter.send_file(
channel_id=channel_id,
file_path=str(file_path.absolute()),
caption=f"Diagram: {file_path.name}",
thread_id=thread_id,
)
if result.get("success"):
sent_files.append(file_path.name)
print(f"[Runtime] Sent diagram file: {file_path.name}")
else:
print(f"[Runtime] Failed to send diagram: {result.get('error')}")
except Exception as e:
print(f"[Runtime] Error sending diagram file: {e}")
if sent_files:
print(f"[Runtime] Successfully sent {len(sent_files)} diagram file(s)")
async def _process_message_queue(self) -> None:
"""Background task to process incoming messages."""
print("[Runtime] Message processing loop started")
@@ -169,12 +224,22 @@ class AdapterRuntime:
user_message=processed_message.text,
username=username,
progress_callback=progress_callback,
inbound_message=processed_message,
)
# Apply postprocessors
for postprocessor in self._postprocessors:
response = postprocessor(response, processed_message)
# NEW: Detect and send diagram files mentioned in response
if adapter:
await self._detect_and_send_diagrams(
response,
adapter,
message.channel_id,
message.thread_id,
)
# Send response back
if adapter:
reply_to = (

View File

@@ -58,19 +58,29 @@ class SlackAdapter(BaseAdapter):
)
def validate_config(self) -> bool:
"""Validate Slack configuration."""
"""Validate Slack configuration.
Required scopes for bot token:
- files:read (for downloading file attachments)
- files:write (for uploading files - future feature)
"""
if not self.config.credentials:
return False
bot_token = self.config.credentials.get("bot_token", "")
app_token = self.config.credentials.get("app_token", "")
return (
valid = (
bool(bot_token and app_token)
and bot_token.startswith("xoxb-")
and app_token.startswith("xapp-")
)
if valid:
print("[Slack] ✓ Config valid. Ensure bot has 'files:read' and 'files:write' scopes at api.slack.com")
return valid
async def start(self) -> None:
"""Start the Slack Socket Mode connection."""
if not self.validate_config():
@@ -116,9 +126,40 @@ class SlackAdapter(BaseAdapter):
channel = event.get("channel")
thread_ts = event.get("thread_ts")
ts = event.get("ts")
files = event.get("files", [])
# DEBUG: Log full event structure
print(f"[Slack DEBUG] Event subtype: {event.get('subtype')}")
print(f"[Slack DEBUG] Event has text: {bool(text)}, text length: {len(text)}")
print(f"[Slack DEBUG] Event has files: {bool(files)}, file count: {len(files)}")
# DEBUG: Log file detection
if files:
print(f"[Slack DEBUG] Detected {len(files)} file(s) in message")
for f in files:
print(f"[Slack DEBUG] File: {f.get('name')} ({f.get('mimetype')}, ID: {f.get('id')})")
username = await self._get_username(user_id)
# Determine message type
message_type = MessageType.FILE if files else MessageType.TEXT
# Download files
downloaded_files = []
for file_info in files:
print(f"[Slack DEBUG] Downloading: {file_info.get('name')} (ID: {file_info.get('id')})")
result = await self._download_slack_file(file_info)
if result["success"]:
print(f"[Slack DEBUG] Downloaded to: {result['file_path']}")
downloaded_files.append(result)
else:
print(f"[Slack] Failed to download file {file_info.get('name')}: {result['error']}")
# If files but no text, add placeholder
if files and not text:
file_names = ", ".join(f["filename"] for f in downloaded_files)
text = f"[Uploaded {len(downloaded_files)} file(s): {file_names}]"
inbound_msg = InboundMessage(
platform="slack",
user_id=user_id,
@@ -127,11 +168,12 @@ class SlackAdapter(BaseAdapter):
channel_id=channel,
thread_id=thread_ts,
reply_to_id=None,
message_type=MessageType.TEXT,
message_type=message_type,
metadata={
"ts": ts,
"team": event.get("team"),
"channel_type": event.get("channel_type"),
"files": downloaded_files,
},
raw=event,
)
@@ -146,9 +188,35 @@ class SlackAdapter(BaseAdapter):
channel = event.get("channel")
thread_ts = event.get("thread_ts")
ts = event.get("ts")
files = event.get("files", [])
# DEBUG: Log file detection
if files:
print(f"[Slack DEBUG @mention] Detected {len(files)} file(s)")
for f in files:
print(f"[Slack DEBUG @mention] File: {f.get('name')} ({f.get('mimetype')})")
username = await self._get_username(user_id)
# Determine message type
message_type = MessageType.FILE if files else MessageType.TEXT
# Download files
downloaded_files = []
for file_info in files:
print(f"[Slack DEBUG @mention] Downloading: {file_info.get('name')} (ID: {file_info.get('id')})")
result = await self._download_slack_file(file_info)
if result["success"]:
print(f"[Slack DEBUG @mention] Downloaded to: {result['file_path']}")
downloaded_files.append(result)
else:
print(f"[Slack @mention] Failed to download file {file_info.get('name')}: {result['error']}")
# If files but no text (after stripping mention), add placeholder
if files and not text:
file_names = ", ".join(f["filename"] for f in downloaded_files)
text = f"[Uploaded {len(downloaded_files)} file(s): {file_names}]"
inbound_msg = InboundMessage(
platform="slack",
user_id=user_id,
@@ -157,17 +225,88 @@ class SlackAdapter(BaseAdapter):
channel_id=channel,
thread_id=thread_ts,
reply_to_id=None,
message_type=MessageType.TEXT,
message_type=message_type,
metadata={
"ts": ts,
"mentioned": True,
"team": event.get("team"),
"files": downloaded_files,
},
raw=event,
)
self._dispatch_message(inbound_msg)
async def _download_slack_file(
self,
file_info: Dict[str, Any],
output_dir: str = "downloads/slack"
) -> Dict[str, Any]:
"""Download a file from Slack using url_private_download.
Args:
file_info: File object from Slack event (contains url_private_download, name, etc.)
output_dir: Directory to save files (default: "downloads/slack")
Returns:
Dict with success, file_path, filename, mimetype, size, or error
"""
import aiohttp
from pathlib import Path
from datetime import datetime
url = file_info.get("url_private_download")
token = self.config.credentials["bot_token"]
headers = {"Authorization": f"Bearer {token}"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
if response.status == 403:
return {
"success": False,
"error": "Permission denied. Add 'files:read' scope to bot at api.slack.com → OAuth & Permissions → Bot Token Scopes"
}
elif response.status == 404:
return {"success": False, "error": "File not found or expired"}
elif response.status != 200:
return {"success": False, "error": f"HTTP {response.status}"}
content_type = response.headers.get("Content-Type", "")
file_data = await response.read()
# Detect HTML login page (auth failure)
if content_type.startswith("text/html") or file_data.startswith(b"<!DOCTYPE") or file_data.startswith(b"<html"):
print(f"[Slack] Auth failure: Got HTML instead of file (likely missing 'files:read' scope)")
return {
"success": False,
"error": "Authentication failed. Bot needs 'files:read' scope. Add it at api.slack.com → OAuth & Permissions → Scopes → Add files:read → Reinstall to Workspace"
}
# Save to disk
Path(output_dir).mkdir(parents=True, exist_ok=True)
safe_name = Path(file_info["name"]).name # Prevent path traversal
file_path = Path(output_dir) / safe_name
# Handle duplicates with timestamp
if file_path.exists():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
stem, suffix = safe_name.rsplit(".", 1) if "." in safe_name else (safe_name, "")
safe_name = f"{stem}_{timestamp}.{suffix}" if suffix else f"{stem}_{timestamp}"
file_path = Path(output_dir) / safe_name
file_path.write_bytes(file_data)
return {
"success": True,
"file_path": str(file_path.absolute()),
"filename": safe_name,
"mimetype": file_info.get("mimetype", ""),
"size": len(file_data)
}
except Exception as e:
return {"success": False, "error": str(e)}
async def send_message(
self, message: OutboundMessage
) -> Dict[str, Any]:
@@ -256,6 +395,45 @@ class SlackAdapter(BaseAdapter):
"error": str(e.response.get("error")),
}
async def send_file(
self,
channel_id: str,
file_path: str,
caption: Optional[str] = None,
thread_id: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to Slack channel."""
if not self.app:
return {"success": False, "error": "Adapter not started"}
try:
from pathlib import Path
path = Path(file_path)
if not path.exists():
return {"success": False, "error": f"File not found: {file_path}"}
result = await self.app.client.files_upload_v2(
channel=channel_id,
file=str(path.absolute()),
title=path.name,
initial_comment=caption or "",
thread_ts=thread_id,
)
return {
"success": True,
"message_id": result["file"]["id"],
"file_path": file_path,
}
except SlackApiError as e:
error_msg = e.response["error"]
print(f"[Slack] Error uploading file: {error_msg}")
return {"success": False, "error": error_msg}
except Exception as e:
print(f"[Slack] Unexpected error uploading file: {e}")
return {"success": False, "error": str(e)}
async def _get_username(self, user_id: str) -> str:
"""Get username from user ID, with caching to avoid excessive API calls.

View File

@@ -306,6 +306,58 @@ class TelegramAdapter(BaseAdapter):
except TelegramError as e:
print(f"[Telegram] Error sending typing indicator: {e}")
async def send_file(
self,
channel_id: str,
file_path: str,
caption: Optional[str] = None,
thread_id: Optional[str] = None
) -> Dict[str, Any]:
"""Send a file (image or document) to Telegram."""
if not self.bot:
return {"success": False, "error": "Bot not started"}
try:
from pathlib import Path
path = Path(file_path)
if not path.exists():
return {"success": False, "error": f"File not found: {file_path}"}
chat_id = int(channel_id)
reply_to = int(thread_id) if thread_id else None
ext = path.suffix.lower()
# Send as photo for images, document for others
if ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]:
with open(path, "rb") as photo:
sent = await self.bot.send_photo(
chat_id=chat_id,
photo=photo,
caption=caption,
reply_to_message_id=reply_to,
)
else:
with open(path, "rb") as document:
sent = await self.bot.send_document(
chat_id=chat_id,
document=document,
caption=caption,
reply_to_message_id=reply_to,
)
return {
"success": True,
"message_id": sent.message_id,
"file_path": file_path,
}
except TelegramError as e:
print(f"[Telegram] Error sending file: {e}")
return {"success": False, "error": str(e)}
except Exception as e:
print(f"[Telegram] Unexpected error sending file: {e}")
return {"success": False, "error": str(e)}
async def health_check(self) -> Dict[str, Any]:
"""Perform health check."""
base_health = await super().health_check()

76
agent_registry.py Normal file
View File

@@ -0,0 +1,76 @@
"""Agent Registry - Thread-safe global singleton for MCP tool access.
MCP tools are module-level functions that cannot access the Agent instance
directly. This registry provides a thread-safe bridge so that tools like
delegate_task can call Agent.delegate() without circular imports.
Usage:
# At bot startup (bot_runner.py):
from agent_registry import register_agent
agent = Agent(...)
register_agent(agent)
# In MCP tools (mcp_tools.py):
from agent_registry import get_agent
agent = get_agent()
if agent:
result = agent.delegate(task, specialist_prompt)
"""
import threading
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from agent import Agent
# Module-level singleton state
_agent: Optional['Agent'] = None
_lock = threading.Lock()
def register_agent(agent: 'Agent') -> None:
"""Register the main Agent instance for MCP tool access.
Must be called exactly once at bot startup, after Agent is initialized.
Thread-safe.
Args:
agent: The main Agent instance (not a sub-agent).
Raises:
ValueError: If agent is None or is a sub-agent.
"""
global _agent
if agent is None:
raise ValueError("Cannot register None as the main agent")
if getattr(agent, 'is_sub_agent', False):
raise ValueError("Cannot register a sub-agent as the main agent")
with _lock:
_agent = agent
print(f"[AgentRegistry] Main agent registered (provider={agent.llm.provider})")
def get_agent() -> Optional['Agent']:
"""Get the registered main Agent instance.
Thread-safe. Returns None if no agent has been registered yet.
Returns:
The main Agent instance, or None.
"""
with _lock:
return _agent
def clear_agent() -> None:
"""Clear the registered agent (for testing or shutdown).
Thread-safe.
"""
global _agent
with _lock:
_agent = None
print("[AgentRegistry] Agent registry cleared")

View File

@@ -26,6 +26,7 @@ from adapters.runtime import AdapterRuntime
from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter
from agent import Agent
from agent_registry import register_agent
from config.config_loader import ConfigLoader
from google_tools.oauth_manager import GoogleOAuthManager
from scheduled_tasks import TaskScheduler
@@ -79,6 +80,9 @@ class BotRunner:
)
print("[Setup] Agent initialized")
# Register agent in global registry for MCP tool access (delegate_task)
register_agent(self.agent)
self.runtime = AdapterRuntime(self.agent)
enabled_count = sum(

View File

@@ -0,0 +1,6 @@
excalidraw_mcp:
enabled: true
output_dir: "downloads/diagrams/excalidraw" # Default location; bot can save elsewhere if requested
default_export_type: "png" # Options: png, svg
canvas_width: 1920
canvas_height: 1080

5
config/mermaid_mcp.yaml Normal file
View File

@@ -0,0 +1,5 @@
mermaid_mcp:
enabled: true
output_dir: "downloads/diagrams/mermaid" # Default location; bot can save elsewhere if requested
default_format: "png" # Options: png, svg, pdf
theme: "default" # Options: default, dark, forest, neutral

View File

@@ -0,0 +1,89 @@
"""Excalidraw MCP Server Integration.
Manages the external excalidraw-mcp server process for hand-drawn style diagram generation.
Architecture:
Garvis → (stdio) → excalidraw-mcp (Node.js) → Canvas API → PNG/SVG images
"""
import os
from pathlib import Path
from typing import Any, Dict, List
import yaml
_CONFIG_FILE = Path("config/excalidraw_mcp.yaml")
def _load_config() -> Dict[str, Any]:
"""Load Excalidraw MCP configuration from YAML and env vars."""
config = {}
if _CONFIG_FILE.exists():
try:
with open(_CONFIG_FILE, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
except Exception:
pass # Use defaults if config fails to load
excalidraw = config.get("excalidraw_mcp", {})
# Apply env var overrides
if os.getenv("EXCALIDRAW_ENABLED"):
excalidraw["enabled"] = os.getenv("EXCALIDRAW_ENABLED").lower() in ("true", "1")
if os.getenv("EXCALIDRAW_OUTPUT_DIR"):
excalidraw["output_dir"] = os.getenv("EXCALIDRAW_OUTPUT_DIR")
if os.getenv("EXCALIDRAW_DEFAULT_EXPORT"):
excalidraw["default_export_type"] = os.getenv("EXCALIDRAW_DEFAULT_EXPORT")
# Set defaults if not configured
excalidraw.setdefault("enabled", True)
excalidraw.setdefault("output_dir", "downloads/diagrams/excalidraw")
excalidraw.setdefault("default_export_type", "png")
excalidraw.setdefault("canvas_width", 1920)
excalidraw.setdefault("canvas_height", 1080)
return excalidraw
def is_excalidraw_enabled() -> bool:
"""Check if Excalidraw MCP integration is enabled."""
config = _load_config()
return config.get("enabled", True)
def get_excalidraw_server_config() -> Dict[str, Any]:
"""Build the MCP server configuration for Agent SDK registration.
Returns:
Dict with command, args, and env for subprocess execution
"""
config = _load_config()
output_dir = config.get("output_dir", "downloads/diagrams/excalidraw")
# Ensure output directory exists
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Build environment variables for the MCP server
env = {
"PATH": os.environ.get("PATH", ""),
"HOME": os.environ.get("HOME", os.environ.get("USERPROFILE", "")),
"APPDATA": os.environ.get("APPDATA", ""),
"TEMP": os.environ.get("TEMP", os.environ.get("TMP", "")),
}
# Add config as env vars for the server
env["EXCALIDRAW_OUTPUT_DIR"] = str(Path(output_dir).absolute())
env["EXCALIDRAW_DEFAULT_EXPORT"] = config.get("default_export_type", "png")
env["EXCALIDRAW_CANVAS_WIDTH"] = str(config.get("canvas_width", 1920))
env["EXCALIDRAW_CANVAS_HEIGHT"] = str(config.get("canvas_height", 1080))
return {
"command": "npx",
"args": ["-y", "excalidraw-mcp"],
"env": env,
}
# Tool names exposed by excalidraw-mcp
# Based on excalidraw-mcp documentation
EXCALIDRAW_TOOLS: List[str] = [
"create_diagram", # Create new Excalidraw diagram
"add_element", # Add shape/element to diagram
"export_diagram", # Export to PNG/SVG
"list_diagrams", # List saved diagrams
]

View File

@@ -0,0 +1,86 @@
"""Mermaid MCP Server Integration.
Manages the external @peng-shawn/mermaid-mcp-server process for diagram generation.
Architecture:
Garvis → (stdio) → mermaid-mcp-server (Node.js) → Puppeteer → PNG images
"""
import os
from pathlib import Path
from typing import Any, Dict, List
import yaml
_CONFIG_FILE = Path("config/mermaid_mcp.yaml")
def _load_config() -> Dict[str, Any]:
"""Load Mermaid MCP configuration from YAML and env vars."""
config = {}
if _CONFIG_FILE.exists():
try:
with open(_CONFIG_FILE, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
except Exception:
pass # Use defaults if config fails to load
mermaid = config.get("mermaid_mcp", {})
# Apply env var overrides
if os.getenv("MERMAID_ENABLED"):
mermaid["enabled"] = os.getenv("MERMAID_ENABLED").lower() in ("true", "1")
if os.getenv("MERMAID_OUTPUT_DIR"):
mermaid["output_dir"] = os.getenv("MERMAID_OUTPUT_DIR")
if os.getenv("MERMAID_DEFAULT_FORMAT"):
mermaid["default_format"] = os.getenv("MERMAID_DEFAULT_FORMAT")
if os.getenv("MERMAID_THEME"):
mermaid["theme"] = os.getenv("MERMAID_THEME")
# Set defaults if not configured
mermaid.setdefault("enabled", True)
mermaid.setdefault("output_dir", "downloads/diagrams/mermaid")
mermaid.setdefault("default_format", "png")
mermaid.setdefault("theme", "default")
return mermaid
def is_mermaid_enabled() -> bool:
"""Check if Mermaid MCP integration is enabled."""
config = _load_config()
return config.get("enabled", True)
def get_mermaid_server_config() -> Dict[str, Any]:
"""Build the MCP server configuration for Agent SDK registration.
Returns:
Dict with command, args, and env for subprocess execution
"""
config = _load_config()
output_dir = config.get("output_dir", "downloads/diagrams/mermaid")
# Ensure output directory exists
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Build environment variables for the MCP server
env = {
"PATH": os.environ.get("PATH", ""),
"HOME": os.environ.get("HOME", os.environ.get("USERPROFILE", "")),
"APPDATA": os.environ.get("APPDATA", ""),
"TEMP": os.environ.get("TEMP", os.environ.get("TMP", "")),
}
# Add config as env vars for the server
env["MERMAID_OUTPUT_DIR"] = str(Path(output_dir).absolute())
env["MERMAID_DEFAULT_FORMAT"] = config.get("default_format", "png")
env["MERMAID_THEME"] = config.get("theme", "default")
return {
"command": "npx",
"args": ["-y", "@peng-shawn/mermaid-mcp-server"],
"env": env,
}
# Tool names exposed by mermaid-mcp-server
# Based on @peng-shawn/mermaid-mcp-server documentation
MERMAID_TOOLS: List[str] = [
"render_mermaid", # Main tool: convert Mermaid syntax to PNG
]

View File

@@ -10,6 +10,7 @@ import subprocess
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
from datetime import datetime
import threading
from claude_agent_sdk import tool, create_sdk_mcp_server
import httpx
from bs4 import BeautifulSoup
@@ -21,9 +22,16 @@ try:
except ImportError:
MEMORY_AVAILABLE = False
# Import agent registry for delegate_task tool
try:
from agent_registry import get_agent
AGENT_REGISTRY_AVAILABLE = True
except ImportError:
AGENT_REGISTRY_AVAILABLE = False
# Maximum characters of tool output to return (prevents token explosion)
_MAX_TOOL_OUTPUT = 5000
_MAX_TOOL_OUTPUT = 5000 # Restored for complex diagram generation
# Maximum page size for web fetching (500KB)
_MAX_WEB_PAGE_SIZE = 500_000
@@ -188,7 +196,7 @@ def _is_safe_url(url: str) -> bool:
},
)
async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Read and return file contents."""
"""Read and return file contents with auto-retry for PDFs."""
file_path = args["file_path"]
path = Path(file_path)
@@ -198,6 +206,136 @@ async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"isError": True
}
# Check if it's a PDF
is_pdf = path.suffix.lower() == ".pdf"
if is_pdf:
# Try reading PDF with multiple methods
for attempt, method in enumerate(["pypdf", "pdfplumber", "pdfminer"], 1):
try:
if method == "pypdf":
try:
from pypdf import PdfReader
except ImportError:
continue # Try next method
reader = PdfReader(path)
# Check if actually password-protected
if reader.is_encrypted:
# Try with empty password first (some PDFs are "encrypted" with no password)
try:
reader.decrypt("")
except Exception:
return {
"content": [{"type": "text", "text": f"PDF is password-protected and cannot be read without the password."}],
"isError": True
}
# Extract text from all pages with early truncation
text_parts = []
total_length = 0
truncated = False
for i, page in enumerate(reader.pages, 1):
page_text = page.extract_text()
if page_text.strip():
page_section = f"--- Page {i} ---\n{page_text}"
# Check if adding this page would exceed limit
if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT: # +2 for "\n\n"
# Add partial page if there's room
remaining = _MAX_TOOL_OUTPUT - total_length - 2
if remaining > 100: # Only add if we can fit meaningful content
text_parts.append(page_section[:remaining])
truncated = True
break
text_parts.append(page_section)
total_length += len(page_section) + 2
content = "\n\n".join(text_parts)
if truncated:
content += f"\n... (PDF truncated - showing first {len(text_parts)} of {len(reader.pages)} pages)"
return {
"content": [{"type": "text", "text": f"Content of {file_path} ({len(reader.pages)} pages):\n\n{content}"}]
}
elif method == "pdfplumber":
try:
import pdfplumber
except ImportError:
continue
with pdfplumber.open(path) as pdf:
text_parts = []
total_length = 0
truncated = False
total_pages = len(pdf.pages)
for i, page in enumerate(pdf.pages, 1):
page_text = page.extract_text()
if page_text and page_text.strip():
page_section = f"--- Page {i} ---\n{page_text}"
# Check if adding this page would exceed limit
if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT:
remaining = _MAX_TOOL_OUTPUT - total_length - 2
if remaining > 100:
text_parts.append(page_section[:remaining])
truncated = True
break
text_parts.append(page_section)
total_length += len(page_section) + 2
content = "\n\n".join(text_parts)
if truncated:
content += f"\n... (PDF truncated - showing first {len(text_parts)} of {total_pages} pages)"
return {
"content": [{"type": "text", "text": f"Content of {file_path} ({total_pages} pages):\n\n{content}"}]
}
elif method == "pdfminer":
try:
from pdfminer.high_level import extract_text as pdfminer_extract
except ImportError:
continue
content = pdfminer_extract(path)
if len(content) > _MAX_TOOL_OUTPUT:
content = content[:_MAX_TOOL_OUTPUT] + "\n... (PDF truncated)"
return {
"content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}]
}
except Exception as e:
# If this is the last attempt, return the error
if attempt == 3:
error_msg = str(e).lower()
if "password" in error_msg or "encrypted" in error_msg:
return {
"content": [{"type": "text", "text": f"PDF appears to be password-protected: {str(e)}"}],
"isError": True
}
else:
return {
"content": [{"type": "text", "text": f"Error reading PDF after trying multiple methods: {str(e)}. The PDF might be corrupted or use an unsupported format."}],
"isError": True
}
# Otherwise, continue to next method
continue
# If we get here, no PDF library is installed
return {
"content": [{"type": "text", "text": f"Cannot read PDF: No PDF library installed. Install with: pip install pypdf pdfplumber"}],
"isError": True
}
# Non-PDF files: try reading as text
try:
content = path.read_text(encoding="utf-8")
if len(content) > _MAX_TOOL_OUTPUT:
@@ -206,6 +344,12 @@ async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]:
return {
"content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}]
}
except UnicodeDecodeError:
# Binary file that's not a PDF
return {
"content": [{"type": "text", "text": f"Error: {file_path} appears to be a binary file. Only text files and PDFs are supported."}],
"isError": True
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error reading file: {str(e)}"}],
@@ -1778,6 +1922,129 @@ async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]:
}
# ============================================
# Sub-Agent Delegation Tool (MCP Bridge)
# ============================================
@tool(
name="delegate_task",
description=(
"Delegate a task to a specialist sub-agent. The sub-agent runs in a separate "
"thread with its own conversation context but shares the memory workspace. "
"Use this to parallelize work (e.g., creating multiple diagrams, researching "
"multiple topics). Each sub-agent gets a specialist prompt defining its role. "
"Returns the sub-agent's final response text."
),
input_schema={
"task": str,
"specialist_prompt": str,
"agent_id": str,
},
)
async def delegate_task_tool(args: Dict[str, Any]) -> Dict[str, Any]:
"""Delegate a task to a specialist sub-agent via the main Agent.
This MCP tool bridges the gap between the Agent SDK subprocess (claude.exe)
and the in-process Agent.delegate() method. It retrieves the main Agent
from the global registry and calls delegate() synchronously.
Thread-safe: Agent.delegate() uses Agent._chat_lock internally, and
MemorySystem.write_memory() uses _write_lock for file operations.
"""
task = args.get("task", "")
specialist_prompt = args.get("specialist_prompt", "")
agent_id = args.get("agent_id", "")
# Validate required fields
if not task:
return {
"content": [{"type": "text", "text": "Error: 'task' is required"}],
"isError": True,
}
if not specialist_prompt:
return {
"content": [{
"type": "text",
"text": "Error: 'specialist_prompt' is required (defines the sub-agent role)",
}],
"isError": True,
}
# Check agent registry availability
if not AGENT_REGISTRY_AVAILABLE:
return {
"content": [{
"type": "text",
"text": "Error: agent_registry module not available. Cannot delegate tasks.",
}],
"isError": True,
}
# Get the main agent from the global registry
agent = get_agent()
if agent is None:
return {
"content": [{
"type": "text",
"text": (
"Error: No agent registered. The bot may still be starting up, "
"or agent_registry.register_agent() was not called at startup."
),
}],
"isError": True,
}
# Generate agent_id if not provided
if not agent_id:
agent_id = f"sub_{threading.current_thread().name}_{id(args)}"
try:
# Run delegate in a thread to avoid blocking the async event loop.
# Agent.delegate() is synchronous (calls sub_agent.chat() which holds _chat_lock).
import asyncio
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, # Use default thread pool
lambda: agent.delegate(
task=task,
specialist_prompt=specialist_prompt,
username="default",
agent_id=agent_id,
max_retries=1,
),
)
# Truncate result if too large
if len(result) > _MAX_TOOL_OUTPUT:
result = result[:_MAX_TOOL_OUTPUT] + "\n... (sub-agent output truncated)"
return {
"content": [{
"type": "text",
"text": f"[Sub-agent {agent_id}] Task completed:\n\n{result}",
}],
}
except TimeoutError:
return {
"content": [{
"type": "text",
"text": f"Error: Sub-agent '{agent_id}' timed out. Task may be too complex.",
}],
"isError": True,
}
except Exception as e:
return {
"content": [{
"type": "text",
"text": f"Error delegating to sub-agent '{agent_id}': {type(e).__name__}: {str(e)}",
}],
"isError": True,
}
# Create the MCP server with all tools
file_system_server = create_sdk_mcp_server(
name="file_system",
@@ -1817,5 +2084,7 @@ file_system_server = create_sdk_mcp_server(
gitea_list_files_tool,
gitea_search_code_tool,
gitea_get_tree_tool,
# Sub-agent delegation
delegate_task_tool,
]
)

View File

@@ -6,6 +6,7 @@ Inspired by OpenClaw's memory implementation but simplified.
import hashlib
import sqlite3
import threading
import time
from datetime import datetime
from pathlib import Path
@@ -84,6 +85,11 @@ class MemorySystem:
self.db = sqlite3.connect(str(self.db_path), check_same_thread=False)
self.db.row_factory = sqlite3.Row
# Write lock for concurrent sub-agent access to shared memory files.
# Prevents race conditions when multiple sub-agents write to the same
# daily log or MEMORY.md simultaneously.
self._write_lock = threading.Lock()
self._init_schema()
self._init_special_files()
@@ -575,32 +581,34 @@ class MemorySystem:
return compact
def write_memory(self, content: str, daily: bool = True) -> None:
"""Write to memory file."""
if daily:
today = datetime.now().strftime("%Y-%m-%d")
file_path = self.memory_dir / f"{today}.md"
else:
file_path = self.workspace_dir / "MEMORY.md"
"""Write to memory file. Thread-safe via _write_lock."""
with self._write_lock:
if daily:
today = datetime.now().strftime("%Y-%m-%d")
file_path = self.memory_dir / f"{today}.md"
else:
file_path = self.workspace_dir / "MEMORY.md"
if file_path.exists():
existing = file_path.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
if file_path.exists():
existing = file_path.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
file_path.write_text(content, encoding="utf-8")
self.index_file(file_path)
print(f"Written to {file_path.name}")
file_path.write_text(content, encoding="utf-8")
self.index_file(file_path)
print(f"Written to {file_path.name}")
def update_soul(self, content: str, append: bool = False) -> None:
"""Update SOUL.md (agent personality)."""
soul_file = self.workspace_dir / "SOUL.md"
"""Update SOUL.md (agent personality). Thread-safe via _write_lock."""
with self._write_lock:
soul_file = self.workspace_dir / "SOUL.md"
if append and soul_file.exists():
existing = soul_file.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
if append and soul_file.exists():
existing = soul_file.read_text(encoding="utf-8")
content = f"{existing}\n\n{content}"
soul_file.write_text(content, encoding="utf-8")
self.index_file(soul_file)
print("Updated SOUL.md")
soul_file.write_text(content, encoding="utf-8")
self.index_file(soul_file)
print("Updated SOUL.md")
def update_user(
self, username: str, content: str, append: bool = False

197
sub_agent_manager.py Normal file
View File

@@ -0,0 +1,197 @@
"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle.
Handles:
- Sub-agent spawning and tracking
- Progress monitoring and hang detection
- Automatic cleanup and restart on timeout
"""
import time
import threading
from typing import Dict, Optional, Any
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class SubAgentState:
"""Track state of a running sub-agent."""
agent_id: str
task_description: str
started_at: float
last_activity: float
is_complete: bool = False
result: Optional[str] = None
error: Optional[str] = None
class SubAgentManager:
"""Manages sub-agent lifecycle with hang detection and auto-restart."""
def __init__(self, timeout_seconds: int = 300): # 5 minutes default
"""Initialize manager.
Args:
timeout_seconds: Maximum time without progress before killing sub-agent
"""
self.timeout_seconds = timeout_seconds
self.sub_agents: Dict[str, SubAgentState] = {}
self._lock = threading.Lock()
self._watchdog_thread: Optional[threading.Thread] = None
self._watchdog_running = False
def start_watchdog(self) -> None:
"""Start the watchdog thread that monitors for hung sub-agents."""
if self._watchdog_running:
return
self._watchdog_running = True
self._watchdog_thread = threading.Thread(
target=self._watchdog_loop,
daemon=True,
name="SubAgentWatchdog"
)
self._watchdog_thread.start()
logger.info("[SubAgentManager] Watchdog started (timeout: %ds)", self.timeout_seconds)
def stop_watchdog(self) -> None:
"""Stop the watchdog thread."""
self._watchdog_running = False
if self._watchdog_thread:
self._watchdog_thread.join(timeout=2)
def register_sub_agent(
self,
agent_id: str,
task_description: str
) -> None:
"""Register a new sub-agent for monitoring."""
with self._lock:
now = time.time()
self.sub_agents[agent_id] = SubAgentState(
agent_id=agent_id,
task_description=task_description,
started_at=now,
last_activity=now
)
logger.info("[SubAgentManager] Registered sub-agent: %s - %s", agent_id, task_description)
def update_activity(self, agent_id: str) -> None:
"""Update last activity timestamp for a sub-agent."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].last_activity = time.time()
def mark_complete(
self,
agent_id: str,
result: Optional[str] = None,
error: Optional[str] = None
) -> None:
"""Mark a sub-agent as complete."""
with self._lock:
if agent_id in self.sub_agents:
self.sub_agents[agent_id].is_complete = True
self.sub_agents[agent_id].result = result
self.sub_agents[agent_id].error = error
logger.info("[SubAgentManager] Sub-agent completed: %s (success=%s)",
agent_id, error is None)
def get_hung_agents(self) -> list:
"""Get list of sub-agent IDs that appear to be hung."""
now = time.time()
hung = []
with self._lock:
for agent_id, state in self.sub_agents.items():
if state.is_complete:
continue
time_since_activity = now - state.last_activity
if time_since_activity > self.timeout_seconds:
hung.append(agent_id)
logger.warning(
"[SubAgentManager] Sub-agent appears hung: %s - %s (no activity for %.1fs)",
agent_id, state.task_description, time_since_activity
)
return hung
def cleanup_agent(self, agent_id: str) -> None:
"""Clean up a hung sub-agent."""
with self._lock:
if agent_id in self.sub_agents:
state = self.sub_agents[agent_id]
logger.error(
"[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)",
agent_id,
state.task_description,
time.time() - state.last_activity
)
# Mark as failed
state.is_complete = True
state.error = f"Timeout: No progress for {self.timeout_seconds}s"
def _watchdog_loop(self) -> None:
"""Watchdog loop that runs in background thread."""
while self._watchdog_running:
try:
hung_agents = self.get_hung_agents()
for agent_id in hung_agents:
self.cleanup_agent(agent_id)
# Check every 30 seconds
time.sleep(30)
except Exception as e:
logger.error("[SubAgentManager] Watchdog error: %s", e)
time.sleep(30)
def get_status(self) -> Dict[str, Any]:
"""Get current status of all sub-agents."""
now = time.time()
status = {
"total": len(self.sub_agents),
"complete": 0,
"running": 0,
"hung": 0,
"agents": []
}
with self._lock:
for agent_id, state in self.sub_agents.items():
agent_status = {
"id": agent_id,
"task": state.task_description,
"runtime": now - state.started_at,
"idle_time": now - state.last_activity,
"complete": state.is_complete,
"has_error": state.error is not None
}
if state.is_complete:
status["complete"] += 1
elif (now - state.last_activity) > self.timeout_seconds:
status["hung"] += 1
else:
status["running"] += 1
status["agents"].append(agent_status)
return status
def clear_completed(self) -> None:
"""Remove completed sub-agents from tracking."""
with self._lock:
completed = [
agent_id for agent_id, state in self.sub_agents.items()
if state.is_complete
]
for agent_id in completed:
del self.sub_agents[agent_id]
if completed:
logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))