diff --git a/adapters/base.py b/adapters/base.py index bd84892..bac40dd 100644 --- a/adapters/base.py +++ b/adapters/base.py @@ -145,6 +145,26 @@ class BaseAdapter(ABC): async def send_typing_indicator(self, channel_id: str) -> None: """Show typing indicator. Optional.""" + async def send_file( + self, + channel_id: str, + file_path: str, + caption: Optional[str] = None, + thread_id: Optional[str] = None + ) -> Dict[str, Any]: + """Send a file attachment to the platform. Optional - override if supported. + + Args: + channel_id: Channel/chat ID to send to + file_path: Absolute path to file + caption: Optional caption/text with the file + thread_id: Optional thread/reply ID + + Returns: + Dict with success status and message_id or error + """ + return {"success": False, "error": "send_file not implemented"} + async def health_check(self) -> Dict[str, Any]: """Perform health check on the adapter.""" return { diff --git a/adapters/runtime.py b/adapters/runtime.py index b4df0d2..16096a6 100644 --- a/adapters/runtime.py +++ b/adapters/runtime.py @@ -98,6 +98,61 @@ class AdapterRuntime: print("[Runtime] Warning: No event loop for message dispatch") self._message_queue.put_nowait(message) + async def _detect_and_send_diagrams( + self, + response: str, + adapter: BaseAdapter, + channel_id: str, + thread_id: Optional[str] + ) -> None: + """Detect diagram file paths in response and auto-send them. + + Args: + response: Agent's text response + adapter: Platform adapter to send files with + channel_id: Channel/chat ID + thread_id: Thread/message ID for replies + """ + import re + from pathlib import Path + + # Match diagram file paths: "Saved to: path/to/diagram.png" + # Pattern matches common phrases followed by file path with image/diagram extensions + pattern = r"(?:Saved|Created|Generated|Exported|File saved|Output file)\s*(?:to|at)?[:\s]+([^\s]+\.(?:png|svg|pdf|jpg|jpeg))" + matches = re.findall(pattern, response, re.IGNORECASE) + + if not matches: + return + + sent_files = [] + for file_path_str in matches: + try: + file_path = Path(file_path_str) + + # Check if file exists + if not file_path.exists(): + print(f"[Runtime] Diagram file not found: {file_path}") + continue + + # Send file via adapter + result = await adapter.send_file( + channel_id=channel_id, + file_path=str(file_path.absolute()), + caption=f"Diagram: {file_path.name}", + thread_id=thread_id, + ) + + if result.get("success"): + sent_files.append(file_path.name) + print(f"[Runtime] Sent diagram file: {file_path.name}") + else: + print(f"[Runtime] Failed to send diagram: {result.get('error')}") + except Exception as e: + print(f"[Runtime] Error sending diagram file: {e}") + + if sent_files: + print(f"[Runtime] Successfully sent {len(sent_files)} diagram file(s)") + async def _process_message_queue(self) -> None: """Background task to process incoming messages.""" print("[Runtime] Message processing loop started") @@ -169,12 +224,22 @@ class AdapterRuntime: user_message=processed_message.text, username=username, progress_callback=progress_callback, + inbound_message=processed_message, ) # Apply postprocessors for postprocessor in self._postprocessors: response = postprocessor(response, processed_message) + # NEW: Detect and send diagram files mentioned in response + if adapter: + await self._detect_and_send_diagrams( + response, + adapter, + message.channel_id, + message.thread_id, + ) + # Send response back if adapter: reply_to = ( diff --git a/adapters/slack/adapter.py b/adapters/slack/adapter.py index fbc48bf..bb9c590 100644 --- a/adapters/slack/adapter.py +++ b/adapters/slack/adapter.py @@ -58,19 +58,29 @@ class SlackAdapter(BaseAdapter): ) def validate_config(self) -> bool: - """Validate Slack configuration.""" + """Validate Slack configuration. + + Required scopes for bot token: + - files:read (for downloading file attachments) + - files:write (for uploading files - future feature) + """ if not self.config.credentials: return False bot_token = self.config.credentials.get("bot_token", "") app_token = self.config.credentials.get("app_token", "") - return ( + valid = ( bool(bot_token and app_token) and bot_token.startswith("xoxb-") and app_token.startswith("xapp-") ) + if valid: + print("[Slack] ✓ Config valid. Ensure bot has 'files:read' and 'files:write' scopes at api.slack.com") + + return valid + async def start(self) -> None: """Start the Slack Socket Mode connection.""" if not self.validate_config(): @@ -116,9 +126,40 @@ class SlackAdapter(BaseAdapter): channel = event.get("channel") thread_ts = event.get("thread_ts") ts = event.get("ts") + files = event.get("files", []) + + # DEBUG: Log full event structure + print(f"[Slack DEBUG] Event subtype: {event.get('subtype')}") + print(f"[Slack DEBUG] Event has text: {bool(text)}, text length: {len(text)}") + print(f"[Slack DEBUG] Event has files: {bool(files)}, file count: {len(files)}") + + # DEBUG: Log file detection + if files: + print(f"[Slack DEBUG] Detected {len(files)} file(s) in message") + for f in files: + print(f"[Slack DEBUG] File: {f.get('name')} ({f.get('mimetype')}, ID: {f.get('id')})") username = await self._get_username(user_id) + # Determine message type + message_type = MessageType.FILE if files else MessageType.TEXT + + # Download files + downloaded_files = [] + for file_info in files: + print(f"[Slack DEBUG] Downloading: {file_info.get('name')} (ID: {file_info.get('id')})") + result = await self._download_slack_file(file_info) + if result["success"]: + print(f"[Slack DEBUG] Downloaded to: {result['file_path']}") + downloaded_files.append(result) + else: + print(f"[Slack] Failed to download file {file_info.get('name')}: {result['error']}") + + # If files but no text, add placeholder + if files and not text: + file_names = ", ".join(f["filename"] for f in downloaded_files) + text = f"[Uploaded {len(downloaded_files)} file(s): {file_names}]" + inbound_msg = InboundMessage( platform="slack", user_id=user_id, @@ -127,11 +168,12 @@ class SlackAdapter(BaseAdapter): channel_id=channel, thread_id=thread_ts, reply_to_id=None, - message_type=MessageType.TEXT, + message_type=message_type, metadata={ "ts": ts, "team": event.get("team"), "channel_type": event.get("channel_type"), + "files": downloaded_files, }, raw=event, ) @@ -146,9 +188,35 @@ class SlackAdapter(BaseAdapter): channel = event.get("channel") thread_ts = event.get("thread_ts") ts = event.get("ts") + files = event.get("files", []) + + # DEBUG: Log file detection + if files: + print(f"[Slack DEBUG @mention] Detected {len(files)} file(s)") + for f in files: + print(f"[Slack DEBUG @mention] File: {f.get('name')} ({f.get('mimetype')})") username = await self._get_username(user_id) + # Determine message type + message_type = MessageType.FILE if files else MessageType.TEXT + + # Download files + downloaded_files = [] + for file_info in files: + print(f"[Slack DEBUG @mention] Downloading: {file_info.get('name')} (ID: {file_info.get('id')})") + result = await self._download_slack_file(file_info) + if result["success"]: + print(f"[Slack DEBUG @mention] Downloaded to: {result['file_path']}") + downloaded_files.append(result) + else: + print(f"[Slack @mention] Failed to download file {file_info.get('name')}: {result['error']}") + + # If files but no text (after stripping mention), add placeholder + if files and not text: + file_names = ", ".join(f["filename"] for f in downloaded_files) + text = f"[Uploaded {len(downloaded_files)} file(s): {file_names}]" + inbound_msg = InboundMessage( platform="slack", user_id=user_id, @@ -157,17 +225,88 @@ class SlackAdapter(BaseAdapter): channel_id=channel, thread_id=thread_ts, reply_to_id=None, - message_type=MessageType.TEXT, + message_type=message_type, metadata={ "ts": ts, "mentioned": True, "team": event.get("team"), + "files": downloaded_files, }, raw=event, ) self._dispatch_message(inbound_msg) + async def _download_slack_file( + self, + file_info: Dict[str, Any], + output_dir: str = "downloads/slack" + ) -> Dict[str, Any]: + """Download a file from Slack using url_private_download. + + Args: + file_info: File object from Slack event (contains url_private_download, name, etc.) + output_dir: Directory to save files (default: "downloads/slack") + + Returns: + Dict with success, file_path, filename, mimetype, size, or error + """ + import aiohttp + from pathlib import Path + from datetime import datetime + + url = file_info.get("url_private_download") + token = self.config.credentials["bot_token"] + headers = {"Authorization": f"Bearer {token}"} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + if response.status == 403: + return { + "success": False, + "error": "Permission denied. Add 'files:read' scope to bot at api.slack.com → OAuth & Permissions → Bot Token Scopes" + } + elif response.status == 404: + return {"success": False, "error": "File not found or expired"} + elif response.status != 200: + return {"success": False, "error": f"HTTP {response.status}"} + + content_type = response.headers.get("Content-Type", "") + file_data = await response.read() + + # Detect HTML login page (auth failure) + if content_type.startswith("text/html") or file_data.startswith(b" Dict[str, Any]: @@ -256,6 +395,45 @@ class SlackAdapter(BaseAdapter): "error": str(e.response.get("error")), } + async def send_file( + self, + channel_id: str, + file_path: str, + caption: Optional[str] = None, + thread_id: Optional[str] = None + ) -> Dict[str, Any]: + """Upload a file to Slack channel.""" + if not self.app: + return {"success": False, "error": "Adapter not started"} + + try: + from pathlib import Path + path = Path(file_path) + + if not path.exists(): + return {"success": False, "error": f"File not found: {file_path}"} + + result = await self.app.client.files_upload_v2( + channel=channel_id, + file=str(path.absolute()), + title=path.name, + initial_comment=caption or "", + thread_ts=thread_id, + ) + + return { + "success": True, + "message_id": result["file"]["id"], + "file_path": file_path, + } + except SlackApiError as e: + error_msg = e.response["error"] + print(f"[Slack] Error uploading file: {error_msg}") + return {"success": False, "error": error_msg} + except Exception as e: + print(f"[Slack] Unexpected error uploading file: {e}") + return {"success": False, "error": str(e)} + async def _get_username(self, user_id: str) -> str: """Get username from user ID, with caching to avoid excessive API calls. diff --git a/adapters/telegram/adapter.py b/adapters/telegram/adapter.py index 1b5d348..523e783 100644 --- a/adapters/telegram/adapter.py +++ b/adapters/telegram/adapter.py @@ -306,6 +306,58 @@ class TelegramAdapter(BaseAdapter): except TelegramError as e: print(f"[Telegram] Error sending typing indicator: {e}") + async def send_file( + self, + channel_id: str, + file_path: str, + caption: Optional[str] = None, + thread_id: Optional[str] = None + ) -> Dict[str, Any]: + """Send a file (image or document) to Telegram.""" + if not self.bot: + return {"success": False, "error": "Bot not started"} + + try: + from pathlib import Path + path = Path(file_path) + + if not path.exists(): + return {"success": False, "error": f"File not found: {file_path}"} + + chat_id = int(channel_id) + reply_to = int(thread_id) if thread_id else None + ext = path.suffix.lower() + + # Send as photo for images, document for others + if ext in [".png", ".jpg", ".jpeg", ".gif", ".webp"]: + with open(path, "rb") as photo: + sent = await self.bot.send_photo( + chat_id=chat_id, + photo=photo, + caption=caption, + reply_to_message_id=reply_to, + ) + else: + with open(path, "rb") as document: + sent = await self.bot.send_document( + chat_id=chat_id, + document=document, + caption=caption, + reply_to_message_id=reply_to, + ) + + return { + "success": True, + "message_id": sent.message_id, + "file_path": file_path, + } + except TelegramError as e: + print(f"[Telegram] Error sending file: {e}") + return {"success": False, "error": str(e)} + except Exception as e: + print(f"[Telegram] Unexpected error sending file: {e}") + return {"success": False, "error": str(e)} + async def health_check(self) -> Dict[str, Any]: """Perform health check.""" base_health = await super().health_check() diff --git a/agent_registry.py b/agent_registry.py new file mode 100644 index 0000000..4d1cc3a --- /dev/null +++ b/agent_registry.py @@ -0,0 +1,76 @@ +"""Agent Registry - Thread-safe global singleton for MCP tool access. + +MCP tools are module-level functions that cannot access the Agent instance +directly. This registry provides a thread-safe bridge so that tools like +delegate_task can call Agent.delegate() without circular imports. + +Usage: + # At bot startup (bot_runner.py): + from agent_registry import register_agent + agent = Agent(...) + register_agent(agent) + + # In MCP tools (mcp_tools.py): + from agent_registry import get_agent + agent = get_agent() + if agent: + result = agent.delegate(task, specialist_prompt) +""" + +import threading +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from agent import Agent + +# Module-level singleton state +_agent: Optional['Agent'] = None +_lock = threading.Lock() + + +def register_agent(agent: 'Agent') -> None: + """Register the main Agent instance for MCP tool access. + + Must be called exactly once at bot startup, after Agent is initialized. + Thread-safe. + + Args: + agent: The main Agent instance (not a sub-agent). + + Raises: + ValueError: If agent is None or is a sub-agent. + """ + global _agent + + if agent is None: + raise ValueError("Cannot register None as the main agent") + if getattr(agent, 'is_sub_agent', False): + raise ValueError("Cannot register a sub-agent as the main agent") + + with _lock: + _agent = agent + print(f"[AgentRegistry] Main agent registered (provider={agent.llm.provider})") + + +def get_agent() -> Optional['Agent']: + """Get the registered main Agent instance. + + Thread-safe. Returns None if no agent has been registered yet. + + Returns: + The main Agent instance, or None. + """ + with _lock: + return _agent + + +def clear_agent() -> None: + """Clear the registered agent (for testing or shutdown). + + Thread-safe. + """ + global _agent + + with _lock: + _agent = None + print("[AgentRegistry] Agent registry cleared") diff --git a/bot_runner.py b/bot_runner.py index a255029..59ec802 100644 --- a/bot_runner.py +++ b/bot_runner.py @@ -26,6 +26,7 @@ from adapters.runtime import AdapterRuntime from adapters.slack.adapter import SlackAdapter from adapters.telegram.adapter import TelegramAdapter from agent import Agent +from agent_registry import register_agent from config.config_loader import ConfigLoader from google_tools.oauth_manager import GoogleOAuthManager from scheduled_tasks import TaskScheduler @@ -79,6 +80,9 @@ class BotRunner: ) print("[Setup] Agent initialized") + # Register agent in global registry for MCP tool access (delegate_task) + register_agent(self.agent) + self.runtime = AdapterRuntime(self.agent) enabled_count = sum( diff --git a/config/excalidraw_mcp.yaml b/config/excalidraw_mcp.yaml new file mode 100644 index 0000000..be512a2 --- /dev/null +++ b/config/excalidraw_mcp.yaml @@ -0,0 +1,6 @@ +excalidraw_mcp: + enabled: true + output_dir: "downloads/diagrams/excalidraw" # Default location; bot can save elsewhere if requested + default_export_type: "png" # Options: png, svg + canvas_width: 1920 + canvas_height: 1080 diff --git a/config/mermaid_mcp.yaml b/config/mermaid_mcp.yaml new file mode 100644 index 0000000..5202650 --- /dev/null +++ b/config/mermaid_mcp.yaml @@ -0,0 +1,5 @@ +mermaid_mcp: + enabled: true + output_dir: "downloads/diagrams/mermaid" # Default location; bot can save elsewhere if requested + default_format: "png" # Options: png, svg, pdf + theme: "default" # Options: default, dark, forest, neutral diff --git a/mcp_servers/excalidraw/excalidraw_mcp.py b/mcp_servers/excalidraw/excalidraw_mcp.py new file mode 100644 index 0000000..22b0b85 --- /dev/null +++ b/mcp_servers/excalidraw/excalidraw_mcp.py @@ -0,0 +1,89 @@ +"""Excalidraw MCP Server Integration. + +Manages the external excalidraw-mcp server process for hand-drawn style diagram generation. + +Architecture: + Garvis → (stdio) → excalidraw-mcp (Node.js) → Canvas API → PNG/SVG images +""" + +import os +from pathlib import Path +from typing import Any, Dict, List +import yaml + +_CONFIG_FILE = Path("config/excalidraw_mcp.yaml") + +def _load_config() -> Dict[str, Any]: + """Load Excalidraw MCP configuration from YAML and env vars.""" + config = {} + if _CONFIG_FILE.exists(): + try: + with open(_CONFIG_FILE, encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + except Exception: + pass # Use defaults if config fails to load + + excalidraw = config.get("excalidraw_mcp", {}) + + # Apply env var overrides + if os.getenv("EXCALIDRAW_ENABLED"): + excalidraw["enabled"] = os.getenv("EXCALIDRAW_ENABLED").lower() in ("true", "1") + if os.getenv("EXCALIDRAW_OUTPUT_DIR"): + excalidraw["output_dir"] = os.getenv("EXCALIDRAW_OUTPUT_DIR") + if os.getenv("EXCALIDRAW_DEFAULT_EXPORT"): + excalidraw["default_export_type"] = os.getenv("EXCALIDRAW_DEFAULT_EXPORT") + + # Set defaults if not configured + excalidraw.setdefault("enabled", True) + excalidraw.setdefault("output_dir", "downloads/diagrams/excalidraw") + excalidraw.setdefault("default_export_type", "png") + excalidraw.setdefault("canvas_width", 1920) + excalidraw.setdefault("canvas_height", 1080) + + return excalidraw + +def is_excalidraw_enabled() -> bool: + """Check if Excalidraw MCP integration is enabled.""" + config = _load_config() + return config.get("enabled", True) + +def get_excalidraw_server_config() -> Dict[str, Any]: + """Build the MCP server configuration for Agent SDK registration. + + Returns: + Dict with command, args, and env for subprocess execution + """ + config = _load_config() + output_dir = config.get("output_dir", "downloads/diagrams/excalidraw") + + # Ensure output directory exists + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Build environment variables for the MCP server + env = { + "PATH": os.environ.get("PATH", ""), + "HOME": os.environ.get("HOME", os.environ.get("USERPROFILE", "")), + "APPDATA": os.environ.get("APPDATA", ""), + "TEMP": os.environ.get("TEMP", os.environ.get("TMP", "")), + } + + # Add config as env vars for the server + env["EXCALIDRAW_OUTPUT_DIR"] = str(Path(output_dir).absolute()) + env["EXCALIDRAW_DEFAULT_EXPORT"] = config.get("default_export_type", "png") + env["EXCALIDRAW_CANVAS_WIDTH"] = str(config.get("canvas_width", 1920)) + env["EXCALIDRAW_CANVAS_HEIGHT"] = str(config.get("canvas_height", 1080)) + + return { + "command": "npx", + "args": ["-y", "excalidraw-mcp"], + "env": env, + } + +# Tool names exposed by excalidraw-mcp +# Based on excalidraw-mcp documentation +EXCALIDRAW_TOOLS: List[str] = [ + "create_diagram", # Create new Excalidraw diagram + "add_element", # Add shape/element to diagram + "export_diagram", # Export to PNG/SVG + "list_diagrams", # List saved diagrams +] diff --git a/mcp_servers/mermaid/mermaid_mcp.py b/mcp_servers/mermaid/mermaid_mcp.py new file mode 100644 index 0000000..c182971 --- /dev/null +++ b/mcp_servers/mermaid/mermaid_mcp.py @@ -0,0 +1,86 @@ +"""Mermaid MCP Server Integration. + +Manages the external @peng-shawn/mermaid-mcp-server process for diagram generation. + +Architecture: + Garvis → (stdio) → mermaid-mcp-server (Node.js) → Puppeteer → PNG images +""" + +import os +from pathlib import Path +from typing import Any, Dict, List +import yaml + +_CONFIG_FILE = Path("config/mermaid_mcp.yaml") + +def _load_config() -> Dict[str, Any]: + """Load Mermaid MCP configuration from YAML and env vars.""" + config = {} + if _CONFIG_FILE.exists(): + try: + with open(_CONFIG_FILE, encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + except Exception: + pass # Use defaults if config fails to load + + mermaid = config.get("mermaid_mcp", {}) + + # Apply env var overrides + if os.getenv("MERMAID_ENABLED"): + mermaid["enabled"] = os.getenv("MERMAID_ENABLED").lower() in ("true", "1") + if os.getenv("MERMAID_OUTPUT_DIR"): + mermaid["output_dir"] = os.getenv("MERMAID_OUTPUT_DIR") + if os.getenv("MERMAID_DEFAULT_FORMAT"): + mermaid["default_format"] = os.getenv("MERMAID_DEFAULT_FORMAT") + if os.getenv("MERMAID_THEME"): + mermaid["theme"] = os.getenv("MERMAID_THEME") + + # Set defaults if not configured + mermaid.setdefault("enabled", True) + mermaid.setdefault("output_dir", "downloads/diagrams/mermaid") + mermaid.setdefault("default_format", "png") + mermaid.setdefault("theme", "default") + + return mermaid + +def is_mermaid_enabled() -> bool: + """Check if Mermaid MCP integration is enabled.""" + config = _load_config() + return config.get("enabled", True) + +def get_mermaid_server_config() -> Dict[str, Any]: + """Build the MCP server configuration for Agent SDK registration. + + Returns: + Dict with command, args, and env for subprocess execution + """ + config = _load_config() + output_dir = config.get("output_dir", "downloads/diagrams/mermaid") + + # Ensure output directory exists + Path(output_dir).mkdir(parents=True, exist_ok=True) + + # Build environment variables for the MCP server + env = { + "PATH": os.environ.get("PATH", ""), + "HOME": os.environ.get("HOME", os.environ.get("USERPROFILE", "")), + "APPDATA": os.environ.get("APPDATA", ""), + "TEMP": os.environ.get("TEMP", os.environ.get("TMP", "")), + } + + # Add config as env vars for the server + env["MERMAID_OUTPUT_DIR"] = str(Path(output_dir).absolute()) + env["MERMAID_DEFAULT_FORMAT"] = config.get("default_format", "png") + env["MERMAID_THEME"] = config.get("theme", "default") + + return { + "command": "npx", + "args": ["-y", "@peng-shawn/mermaid-mcp-server"], + "env": env, + } + +# Tool names exposed by mermaid-mcp-server +# Based on @peng-shawn/mermaid-mcp-server documentation +MERMAID_TOOLS: List[str] = [ + "render_mermaid", # Main tool: convert Mermaid syntax to PNG +] diff --git a/mcp_tools.py b/mcp_tools.py index b163a2c..99bbfae 100644 --- a/mcp_tools.py +++ b/mcp_tools.py @@ -10,6 +10,7 @@ import subprocess from typing import Any, Dict, List, Optional from urllib.parse import urlparse from datetime import datetime +import threading from claude_agent_sdk import tool, create_sdk_mcp_server import httpx from bs4 import BeautifulSoup @@ -21,9 +22,16 @@ try: except ImportError: MEMORY_AVAILABLE = False +# Import agent registry for delegate_task tool +try: + from agent_registry import get_agent + AGENT_REGISTRY_AVAILABLE = True +except ImportError: + AGENT_REGISTRY_AVAILABLE = False + # Maximum characters of tool output to return (prevents token explosion) -_MAX_TOOL_OUTPUT = 5000 +_MAX_TOOL_OUTPUT = 5000 # Restored for complex diagram generation # Maximum page size for web fetching (500KB) _MAX_WEB_PAGE_SIZE = 500_000 @@ -188,7 +196,7 @@ def _is_safe_url(url: str) -> bool: }, ) async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: - """Read and return file contents.""" + """Read and return file contents with auto-retry for PDFs.""" file_path = args["file_path"] path = Path(file_path) @@ -198,6 +206,136 @@ async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: "isError": True } + # Check if it's a PDF + is_pdf = path.suffix.lower() == ".pdf" + + if is_pdf: + # Try reading PDF with multiple methods + for attempt, method in enumerate(["pypdf", "pdfplumber", "pdfminer"], 1): + try: + if method == "pypdf": + try: + from pypdf import PdfReader + except ImportError: + continue # Try next method + + reader = PdfReader(path) + + # Check if actually password-protected + if reader.is_encrypted: + # Try with empty password first (some PDFs are "encrypted" with no password) + try: + reader.decrypt("") + except Exception: + return { + "content": [{"type": "text", "text": f"PDF is password-protected and cannot be read without the password."}], + "isError": True + } + + # Extract text from all pages with early truncation + text_parts = [] + total_length = 0 + truncated = False + + for i, page in enumerate(reader.pages, 1): + page_text = page.extract_text() + if page_text.strip(): + page_section = f"--- Page {i} ---\n{page_text}" + + # Check if adding this page would exceed limit + if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT: # +2 for "\n\n" + # Add partial page if there's room + remaining = _MAX_TOOL_OUTPUT - total_length - 2 + if remaining > 100: # Only add if we can fit meaningful content + text_parts.append(page_section[:remaining]) + truncated = True + break + + text_parts.append(page_section) + total_length += len(page_section) + 2 + + content = "\n\n".join(text_parts) + if truncated: + content += f"\n... (PDF truncated - showing first {len(text_parts)} of {len(reader.pages)} pages)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path} ({len(reader.pages)} pages):\n\n{content}"}] + } + + elif method == "pdfplumber": + try: + import pdfplumber + except ImportError: + continue + + with pdfplumber.open(path) as pdf: + text_parts = [] + total_length = 0 + truncated = False + total_pages = len(pdf.pages) + + for i, page in enumerate(pdf.pages, 1): + page_text = page.extract_text() + if page_text and page_text.strip(): + page_section = f"--- Page {i} ---\n{page_text}" + + # Check if adding this page would exceed limit + if total_length + len(page_section) + 2 > _MAX_TOOL_OUTPUT: + remaining = _MAX_TOOL_OUTPUT - total_length - 2 + if remaining > 100: + text_parts.append(page_section[:remaining]) + truncated = True + break + + text_parts.append(page_section) + total_length += len(page_section) + 2 + + content = "\n\n".join(text_parts) + if truncated: + content += f"\n... (PDF truncated - showing first {len(text_parts)} of {total_pages} pages)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path} ({total_pages} pages):\n\n{content}"}] + } + + elif method == "pdfminer": + try: + from pdfminer.high_level import extract_text as pdfminer_extract + except ImportError: + continue + + content = pdfminer_extract(path) + if len(content) > _MAX_TOOL_OUTPUT: + content = content[:_MAX_TOOL_OUTPUT] + "\n... (PDF truncated)" + + return { + "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] + } + + except Exception as e: + # If this is the last attempt, return the error + if attempt == 3: + error_msg = str(e).lower() + if "password" in error_msg or "encrypted" in error_msg: + return { + "content": [{"type": "text", "text": f"PDF appears to be password-protected: {str(e)}"}], + "isError": True + } + else: + return { + "content": [{"type": "text", "text": f"Error reading PDF after trying multiple methods: {str(e)}. The PDF might be corrupted or use an unsupported format."}], + "isError": True + } + # Otherwise, continue to next method + continue + + # If we get here, no PDF library is installed + return { + "content": [{"type": "text", "text": f"Cannot read PDF: No PDF library installed. Install with: pip install pypdf pdfplumber"}], + "isError": True + } + + # Non-PDF files: try reading as text try: content = path.read_text(encoding="utf-8") if len(content) > _MAX_TOOL_OUTPUT: @@ -206,6 +344,12 @@ async def read_file_tool(args: Dict[str, Any]) -> Dict[str, Any]: return { "content": [{"type": "text", "text": f"Content of {file_path}:\n\n{content}"}] } + except UnicodeDecodeError: + # Binary file that's not a PDF + return { + "content": [{"type": "text", "text": f"Error: {file_path} appears to be a binary file. Only text files and PDFs are supported."}], + "isError": True + } except Exception as e: return { "content": [{"type": "text", "text": f"Error reading file: {str(e)}"}], @@ -1778,6 +1922,129 @@ async def gitea_get_tree_tool(args: Dict[str, Any]) -> Dict[str, Any]: } + + +# ============================================ +# Sub-Agent Delegation Tool (MCP Bridge) +# ============================================ + +@tool( + name="delegate_task", + description=( + "Delegate a task to a specialist sub-agent. The sub-agent runs in a separate " + "thread with its own conversation context but shares the memory workspace. " + "Use this to parallelize work (e.g., creating multiple diagrams, researching " + "multiple topics). Each sub-agent gets a specialist prompt defining its role. " + "Returns the sub-agent's final response text." + ), + input_schema={ + "task": str, + "specialist_prompt": str, + "agent_id": str, + }, +) +async def delegate_task_tool(args: Dict[str, Any]) -> Dict[str, Any]: + """Delegate a task to a specialist sub-agent via the main Agent. + + This MCP tool bridges the gap between the Agent SDK subprocess (claude.exe) + and the in-process Agent.delegate() method. It retrieves the main Agent + from the global registry and calls delegate() synchronously. + + Thread-safe: Agent.delegate() uses Agent._chat_lock internally, and + MemorySystem.write_memory() uses _write_lock for file operations. + """ + task = args.get("task", "") + specialist_prompt = args.get("specialist_prompt", "") + agent_id = args.get("agent_id", "") + + # Validate required fields + if not task: + return { + "content": [{"type": "text", "text": "Error: 'task' is required"}], + "isError": True, + } + if not specialist_prompt: + return { + "content": [{ + "type": "text", + "text": "Error: 'specialist_prompt' is required (defines the sub-agent role)", + }], + "isError": True, + } + + # Check agent registry availability + if not AGENT_REGISTRY_AVAILABLE: + return { + "content": [{ + "type": "text", + "text": "Error: agent_registry module not available. Cannot delegate tasks.", + }], + "isError": True, + } + + # Get the main agent from the global registry + agent = get_agent() + if agent is None: + return { + "content": [{ + "type": "text", + "text": ( + "Error: No agent registered. The bot may still be starting up, " + "or agent_registry.register_agent() was not called at startup." + ), + }], + "isError": True, + } + + # Generate agent_id if not provided + if not agent_id: + agent_id = f"sub_{threading.current_thread().name}_{id(args)}" + + try: + # Run delegate in a thread to avoid blocking the async event loop. + # Agent.delegate() is synchronous (calls sub_agent.chat() which holds _chat_lock). + import asyncio + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, # Use default thread pool + lambda: agent.delegate( + task=task, + specialist_prompt=specialist_prompt, + username="default", + agent_id=agent_id, + max_retries=1, + ), + ) + + # Truncate result if too large + if len(result) > _MAX_TOOL_OUTPUT: + result = result[:_MAX_TOOL_OUTPUT] + "\n... (sub-agent output truncated)" + + return { + "content": [{ + "type": "text", + "text": f"[Sub-agent {agent_id}] Task completed:\n\n{result}", + }], + } + + except TimeoutError: + return { + "content": [{ + "type": "text", + "text": f"Error: Sub-agent '{agent_id}' timed out. Task may be too complex.", + }], + "isError": True, + } + except Exception as e: + return { + "content": [{ + "type": "text", + "text": f"Error delegating to sub-agent '{agent_id}': {type(e).__name__}: {str(e)}", + }], + "isError": True, + } + + # Create the MCP server with all tools file_system_server = create_sdk_mcp_server( name="file_system", @@ -1817,5 +2084,7 @@ file_system_server = create_sdk_mcp_server( gitea_list_files_tool, gitea_search_code_tool, gitea_get_tree_tool, + # Sub-agent delegation + delegate_task_tool, ] ) diff --git a/memory_system.py b/memory_system.py index dc4a812..1af0dee 100644 --- a/memory_system.py +++ b/memory_system.py @@ -6,6 +6,7 @@ Inspired by OpenClaw's memory implementation but simplified. import hashlib import sqlite3 +import threading import time from datetime import datetime from pathlib import Path @@ -84,6 +85,11 @@ class MemorySystem: self.db = sqlite3.connect(str(self.db_path), check_same_thread=False) self.db.row_factory = sqlite3.Row + # Write lock for concurrent sub-agent access to shared memory files. + # Prevents race conditions when multiple sub-agents write to the same + # daily log or MEMORY.md simultaneously. + self._write_lock = threading.Lock() + self._init_schema() self._init_special_files() @@ -575,32 +581,34 @@ class MemorySystem: return compact def write_memory(self, content: str, daily: bool = True) -> None: - """Write to memory file.""" - if daily: - today = datetime.now().strftime("%Y-%m-%d") - file_path = self.memory_dir / f"{today}.md" - else: - file_path = self.workspace_dir / "MEMORY.md" + """Write to memory file. Thread-safe via _write_lock.""" + with self._write_lock: + if daily: + today = datetime.now().strftime("%Y-%m-%d") + file_path = self.memory_dir / f"{today}.md" + else: + file_path = self.workspace_dir / "MEMORY.md" - if file_path.exists(): - existing = file_path.read_text(encoding="utf-8") - content = f"{existing}\n\n{content}" + if file_path.exists(): + existing = file_path.read_text(encoding="utf-8") + content = f"{existing}\n\n{content}" - file_path.write_text(content, encoding="utf-8") - self.index_file(file_path) - print(f"Written to {file_path.name}") + file_path.write_text(content, encoding="utf-8") + self.index_file(file_path) + print(f"Written to {file_path.name}") def update_soul(self, content: str, append: bool = False) -> None: - """Update SOUL.md (agent personality).""" - soul_file = self.workspace_dir / "SOUL.md" + """Update SOUL.md (agent personality). Thread-safe via _write_lock.""" + with self._write_lock: + soul_file = self.workspace_dir / "SOUL.md" - if append and soul_file.exists(): - existing = soul_file.read_text(encoding="utf-8") - content = f"{existing}\n\n{content}" + if append and soul_file.exists(): + existing = soul_file.read_text(encoding="utf-8") + content = f"{existing}\n\n{content}" - soul_file.write_text(content, encoding="utf-8") - self.index_file(soul_file) - print("Updated SOUL.md") + soul_file.write_text(content, encoding="utf-8") + self.index_file(soul_file) + print("Updated SOUL.md") def update_user( self, username: str, content: str, append: bool = False diff --git a/sub_agent_manager.py b/sub_agent_manager.py new file mode 100644 index 0000000..677f902 --- /dev/null +++ b/sub_agent_manager.py @@ -0,0 +1,197 @@ +"""Sub-Agent Manager - Monitors and manages sub-agent lifecycle. + +Handles: +- Sub-agent spawning and tracking +- Progress monitoring and hang detection +- Automatic cleanup and restart on timeout +""" + +import time +import threading +from typing import Dict, Optional, Any +from dataclasses import dataclass +import logging + +logger = logging.getLogger(__name__) + + +@dataclass +class SubAgentState: + """Track state of a running sub-agent.""" + agent_id: str + task_description: str + started_at: float + last_activity: float + is_complete: bool = False + result: Optional[str] = None + error: Optional[str] = None + + +class SubAgentManager: + """Manages sub-agent lifecycle with hang detection and auto-restart.""" + + def __init__(self, timeout_seconds: int = 300): # 5 minutes default + """Initialize manager. + + Args: + timeout_seconds: Maximum time without progress before killing sub-agent + """ + self.timeout_seconds = timeout_seconds + self.sub_agents: Dict[str, SubAgentState] = {} + self._lock = threading.Lock() + self._watchdog_thread: Optional[threading.Thread] = None + self._watchdog_running = False + + def start_watchdog(self) -> None: + """Start the watchdog thread that monitors for hung sub-agents.""" + if self._watchdog_running: + return + + self._watchdog_running = True + self._watchdog_thread = threading.Thread( + target=self._watchdog_loop, + daemon=True, + name="SubAgentWatchdog" + ) + self._watchdog_thread.start() + logger.info("[SubAgentManager] Watchdog started (timeout: %ds)", self.timeout_seconds) + + def stop_watchdog(self) -> None: + """Stop the watchdog thread.""" + self._watchdog_running = False + if self._watchdog_thread: + self._watchdog_thread.join(timeout=2) + + def register_sub_agent( + self, + agent_id: str, + task_description: str + ) -> None: + """Register a new sub-agent for monitoring.""" + with self._lock: + now = time.time() + self.sub_agents[agent_id] = SubAgentState( + agent_id=agent_id, + task_description=task_description, + started_at=now, + last_activity=now + ) + logger.info("[SubAgentManager] Registered sub-agent: %s - %s", agent_id, task_description) + + def update_activity(self, agent_id: str) -> None: + """Update last activity timestamp for a sub-agent.""" + with self._lock: + if agent_id in self.sub_agents: + self.sub_agents[agent_id].last_activity = time.time() + + def mark_complete( + self, + agent_id: str, + result: Optional[str] = None, + error: Optional[str] = None + ) -> None: + """Mark a sub-agent as complete.""" + with self._lock: + if agent_id in self.sub_agents: + self.sub_agents[agent_id].is_complete = True + self.sub_agents[agent_id].result = result + self.sub_agents[agent_id].error = error + logger.info("[SubAgentManager] Sub-agent completed: %s (success=%s)", + agent_id, error is None) + + def get_hung_agents(self) -> list: + """Get list of sub-agent IDs that appear to be hung.""" + now = time.time() + hung = [] + + with self._lock: + for agent_id, state in self.sub_agents.items(): + if state.is_complete: + continue + + time_since_activity = now - state.last_activity + if time_since_activity > self.timeout_seconds: + hung.append(agent_id) + logger.warning( + "[SubAgentManager] Sub-agent appears hung: %s - %s (no activity for %.1fs)", + agent_id, state.task_description, time_since_activity + ) + + return hung + + def cleanup_agent(self, agent_id: str) -> None: + """Clean up a hung sub-agent.""" + with self._lock: + if agent_id in self.sub_agents: + state = self.sub_agents[agent_id] + logger.error( + "[SubAgentManager] Cleaning up hung sub-agent: %s - %s (hung for %.1fs)", + agent_id, + state.task_description, + time.time() - state.last_activity + ) + + # Mark as failed + state.is_complete = True + state.error = f"Timeout: No progress for {self.timeout_seconds}s" + + def _watchdog_loop(self) -> None: + """Watchdog loop that runs in background thread.""" + while self._watchdog_running: + try: + hung_agents = self.get_hung_agents() + for agent_id in hung_agents: + self.cleanup_agent(agent_id) + + # Check every 30 seconds + time.sleep(30) + + except Exception as e: + logger.error("[SubAgentManager] Watchdog error: %s", e) + time.sleep(30) + + def get_status(self) -> Dict[str, Any]: + """Get current status of all sub-agents.""" + now = time.time() + status = { + "total": len(self.sub_agents), + "complete": 0, + "running": 0, + "hung": 0, + "agents": [] + } + + with self._lock: + for agent_id, state in self.sub_agents.items(): + agent_status = { + "id": agent_id, + "task": state.task_description, + "runtime": now - state.started_at, + "idle_time": now - state.last_activity, + "complete": state.is_complete, + "has_error": state.error is not None + } + + if state.is_complete: + status["complete"] += 1 + elif (now - state.last_activity) > self.timeout_seconds: + status["hung"] += 1 + else: + status["running"] += 1 + + status["agents"].append(agent_status) + + return status + + def clear_completed(self) -> None: + """Remove completed sub-agents from tracking.""" + with self._lock: + completed = [ + agent_id for agent_id, state in self.sub_agents.items() + if state.is_complete + ] + for agent_id in completed: + del self.sub_agents[agent_id] + + if completed: + logger.info("[SubAgentManager] Cleared %d completed sub-agents", len(completed))