"""LLM Interface - Claude API, GLM, and other models. Supports two modes for Claude: 1. Agent SDK (v0.1.36+) - DEFAULT - Uses query() API with Max subscription - Set USE_AGENT_SDK=true (default) - Model: claude-sonnet-4-5-20250929 (default for all operations) - All tools are MCP-based (no API key needed) - Tools registered via mcp_tools.py MCP server - Flat-rate subscription cost 2. Direct API (pay-per-token) - Set USE_DIRECT_API=true - Model: claude-sonnet-4-5-20250929 - Requires ANTHROPIC_API_KEY in .env - Uses traditional tool definitions from tools.py """ import asyncio import atexit import logging import os import subprocess import threading from typing import Any, Dict, List, Optional, Set import requests from anthropic import Anthropic from usage_tracker import UsageTracker logger = logging.getLogger(__name__) # Ensure our debug messages are visible even if root logger is at WARNING. # Only add a handler if none exist (prevent duplicate output). if not logger.handlers: _handler = logging.StreamHandler() _handler.setFormatter(logging.Formatter( "%(asctime)s [%(name)s] %(levelname)s: %(message)s", datefmt="%H:%M:%S", )) logger.addHandler(_handler) logger.setLevel(logging.DEBUG) # Try to import Agent SDK (optional dependency) try: from claude_agent_sdk import ( ClaudeAgentOptions, ResultMessage, ) AGENT_SDK_AVAILABLE = True except ImportError: AGENT_SDK_AVAILABLE = False # API key environment variable names by provider _API_KEY_ENV_VARS = { "claude": "ANTHROPIC_API_KEY", "glm": "GLM_API_KEY", } # Mode selection (priority: USE_DIRECT_API > default to Agent SDK) _USE_DIRECT_API = os.getenv("USE_DIRECT_API", "false").lower() == "true" _USE_AGENT_SDK = os.getenv("USE_AGENT_SDK", "true").lower() == "true" # Default models by provider _DEFAULT_MODELS = { "claude": "claude-sonnet-4-5-20250929", "claude_agent_sdk": "claude-sonnet-4-5-20250929", "glm": "glm-4-plus", } _GLM_BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions" # Track PIDs of claude.exe subprocesses we spawn (to avoid killing user's Claude Code session!) _TRACKED_CLAUDE_PIDS: Set[int] = set() _TRACKED_PIDS_LOCK = threading.Lock() def _register_claude_subprocess(pid: int): """Register a claude.exe subprocess PID for cleanup on exit.""" with _TRACKED_PIDS_LOCK: _TRACKED_CLAUDE_PIDS.add(pid) logger.debug("[LLM] Registered claude.exe subprocess PID: %d", pid) def _cleanup_tracked_claude_processes(): """Kill only the claude.exe processes we spawned (not the user's Claude Code session!)""" with _TRACKED_PIDS_LOCK: if not _TRACKED_CLAUDE_PIDS: return logger.info("[LLM] Cleaning up %d tracked claude.exe subprocess(es)", len(_TRACKED_CLAUDE_PIDS)) for pid in _TRACKED_CLAUDE_PIDS: try: if os.name == 'nt': # Windows subprocess.run( ['taskkill', '/F', '/PID', str(pid), '/T'], capture_output=True, timeout=2 ) else: # Linux/Mac subprocess.run(['kill', '-9', str(pid)], capture_output=True, timeout=2) logger.debug("[LLM] Killed claude.exe subprocess PID: %d", pid) except Exception as e: logger.debug("[LLM] Failed to kill PID %d: %s", pid, e) _TRACKED_CLAUDE_PIDS.clear() # Register cleanup on exit (only kills our tracked subprocesses, not all claude.exe!) atexit.register(_cleanup_tracked_claude_processes) class LLMInterface: """LLM interface supporting Claude (Agent SDK or Direct API) and GLM.""" def __init__( self, provider: str = "claude", api_key: Optional[str] = None, track_usage: bool = True, ) -> None: self.provider = provider self.api_key = api_key or os.getenv( _API_KEY_ENV_VARS.get(provider, ""), ) self.client: Optional[Anthropic] = None # Reference to the main asyncio event loop, set by the runtime. # Used by Agent SDK mode to schedule async work from worker threads # via asyncio.run_coroutine_threadsafe(). self._event_loop: Optional[asyncio.AbstractEventLoop] = None # Determine mode (priority: direct API > agent SDK) if provider == "claude": if _USE_DIRECT_API: self.mode = "direct_api" elif _USE_AGENT_SDK and AGENT_SDK_AVAILABLE: self.mode = "agent_sdk" else: self.mode = "direct_api" if _USE_AGENT_SDK and not AGENT_SDK_AVAILABLE: print("[LLM] Warning: Agent SDK not available, falling back to Direct API") print("[LLM] Install with: pip install claude-agent-sdk") else: self.mode = "direct_api" # Usage tracking (only for Direct API pay-per-token mode) self.tracker = UsageTracker() if (track_usage and self.mode == "direct_api") else None # Set model based on mode if provider == "claude": if self.mode == "agent_sdk": self.model = _DEFAULT_MODELS.get("claude_agent_sdk", "claude-sonnet-4-5-20250929") else: self.model = _DEFAULT_MODELS.get(provider, "claude-sonnet-4-5-20250929") else: self.model = _DEFAULT_MODELS.get(provider, "") # Initialize based on mode if provider == "claude": if self.mode == "agent_sdk": print(f"[LLM] Using Agent SDK (Max subscription) with model: {self.model}") elif self.mode == "direct_api": print(f"[LLM] Using Direct API (pay-per-token) with model: {self.model}") self.client = Anthropic(api_key=self.api_key) def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: """Store a reference to the main asyncio event loop. This allows Agent SDK async calls to be scheduled back onto the main event loop from worker threads (created by asyncio.to_thread). Must be called from the async context that owns the loop. """ self._event_loop = loop logger.info( "[LLM] Event loop stored: %s (running=%s)", type(loop).__name__, loop.is_running(), ) @staticmethod def _clean_claude_env() -> dict: """Remove Claude Code session markers from the environment. The Agent SDK's SubprocessCLITransport copies os.environ into the child process. If the bot is launched from within a Claude Code session (or any environment that sets CLAUDECODE), the child ``claude`` CLI detects the nesting and refuses to start with: "Claude Code cannot be launched inside another Claude Code session." This method temporarily removes the offending variables and returns them so the caller can restore them afterwards. """ saved = {} # Keys that signal an active Claude Code parent session. # CLAUDE_CODE_ENTRYPOINT and CLAUDE_AGENT_SDK_VERSION are set by # the SDK itself on the child process, so removing them from the # parent is safe -- the SDK will set them again. markers = [ "CLAUDECODE", "CLAUDE_CODE_ENTRYPOINT", "CLAUDE_AGENT_SDK_VERSION", "CLAUDE_CODE_ENABLE_SDK_FILE_CHECKPOINTING", ] for key in markers: if key in os.environ: saved[key] = os.environ.pop(key) if saved: logger.debug("[LLM] Cleaned Claude session env vars: %s", list(saved.keys())) return saved @staticmethod def _restore_claude_env(saved: dict) -> None: """Restore previously removed Claude session env vars.""" os.environ.update(saved) def _run_async_from_thread(self, coro) -> Any: """Run an async coroutine from a synchronous worker thread. Uses asyncio.run_coroutine_threadsafe() to schedule the coroutine on the main event loop (if available), which is the correct way to bridge sync -> async when called from an asyncio.to_thread() worker or from any background thread (e.g., the scheduler). Falls back to asyncio.run() if no event loop reference is available (e.g., direct script usage without the adapter runtime). Args: coro: An already-created coroutine object (not a coroutine function). """ current_thread = threading.current_thread().name has_loop = self._event_loop is not None loop_running = has_loop and self._event_loop.is_running() if has_loop and loop_running: logger.info( "[LLM] _run_async_from_thread: using run_coroutine_threadsafe " "(thread=%s, loop=%s)", current_thread, type(self._event_loop).__name__, ) # Schedule on the main event loop and block this thread until done. # This works because: # 1. asyncio.to_thread() runs us in a thread pool while the main # loop continues processing other tasks. # 2. Scheduler threads are plain daemon threads, also not blocking # the main loop. # The coroutine executes on the main loop without deadlocking # because the main loop is free to run while we block here. future = asyncio.run_coroutine_threadsafe(coro, self._event_loop) try: # Block with 10-minute timeout to prevent hangs # Complex tasks (repo analysis, multi-step operations) can take 5-8 minutes logger.info("[LLM] Waiting for Agent SDK response (timeout: 600s)...") result = future.result(timeout=600) logger.info("[LLM] Agent SDK response received successfully") return result except TimeoutError: logger.error("[LLM] ⚠️ Agent SDK call TIMED OUT after 600 seconds!") future.cancel() # Cancel the coroutine raise TimeoutError("Agent SDK call exceeded 10 minute timeout - task may be too complex") else: logger.info( "[LLM] _run_async_from_thread: using asyncio.run() fallback " "(thread=%s, has_loop=%s, loop_running=%s)", current_thread, has_loop, loop_running, ) # Fallback: no main loop available (standalone / test usage). # Create a new event loop in this thread via asyncio.run(). return asyncio.run(coro) def chat( self, messages: List[Dict], system: Optional[str] = None, max_tokens: int = 16384, ) -> str: """Send chat request and get response. In Agent SDK mode, this uses query() which handles MCP tools automatically. In Direct API mode, this is a simple messages.create() call without tools. Raises: Exception: If the API call fails or returns an unexpected response. """ if self.provider == "claude": if self.mode == "agent_sdk": try: logger.info("[LLM] chat: dispatching via Agent SDK") response = self._run_async_from_thread( self._agent_sdk_chat(messages, system, max_tokens) ) return response except Exception as e: logger.error("[LLM] Agent SDK error in chat(): %s", e, exc_info=True) raise Exception(f"Agent SDK error: {e}") elif self.mode == "direct_api": response = self.client.messages.create( model=self.model, max_tokens=max_tokens, system=system or "", messages=messages, ) if self.tracker and hasattr(response, "usage"): self.tracker.track( model=self.model, input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, cache_creation_tokens=getattr( response.usage, "cache_creation_input_tokens", 0 ), cache_read_tokens=getattr( response.usage, "cache_read_input_tokens", 0 ), ) if not response.content: return "" return response.content[0].text if self.provider == "glm": payload = { "model": self.model, "messages": [ {"role": "system", "content": system or ""}, ] + messages, "max_tokens": max_tokens, } headers = {"Authorization": f"Bearer {self.api_key}"} response = requests.post( _GLM_BASE_URL, json=payload, headers=headers, timeout=60, ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] raise ValueError(f"Unsupported provider: {self.provider}") def _build_agent_sdk_options(self) -> Optional['ClaudeAgentOptions']: """Build Agent SDK options with MCP servers and allowed tools. Returns configured ClaudeAgentOptions, or None if mcp_tools is unavailable. """ try: from mcp_tools import file_system_server mcp_servers = {"file_system": file_system_server} # All tools registered in the MCP server allowed_tools = [ # File and system tools "read_file", "write_file", "edit_file", "list_directory", "run_command", # Web tool "web_fetch", # Zettelkasten tools "fleeting_note", "daily_note", "literature_note", "permanent_note", "search_vault", "search_by_tags", # Google tools (Gmail, Calendar, Contacts) "get_weather", "send_email", "read_emails", "get_email", "read_calendar", "create_calendar_event", "search_calendar", "create_contact", "list_contacts", "get_contact", # Gitea tools (private repo access) "gitea_read_file", "gitea_list_files", "gitea_search_code", "gitea_get_tree", ] # Conditionally add Obsidian MCP server try: from obsidian_mcp import ( is_obsidian_enabled, check_obsidian_health, get_obsidian_server_config, OBSIDIAN_TOOLS, ) if is_obsidian_enabled() and check_obsidian_health(): obsidian_config = get_obsidian_server_config() mcp_servers["obsidian"] = obsidian_config allowed_tools.extend(OBSIDIAN_TOOLS) print("[LLM] Obsidian MCP server registered (8 tools)") elif is_obsidian_enabled(): print("[LLM] Obsidian MCP enabled but health check failed") except ImportError: pass except Exception as e: print(f"[LLM] Obsidian MCP unavailable: {e}") def _stderr_callback(line: str) -> None: """Log Claude CLI stderr for debugging transport failures.""" logger.debug("[CLI stderr] %s", line) return ClaudeAgentOptions( mcp_servers=mcp_servers, allowed_tools=allowed_tools, permission_mode="bypassPermissions", max_turns=30, # Prevent infinite tool loops (matches MAX_TOOL_ITERATIONS) stderr=_stderr_callback, ) except ImportError: print("[LLM] Warning: mcp_tools not available, no MCP tools will be registered") return None async def _agent_sdk_chat( self, messages: List[Dict], system: Optional[str], max_tokens: int ) -> str: """Agent SDK chat via custom transport flow. Uses the SDK's transport and query layers directly instead of the high-level ``query()`` helper. This works around a bug in ``claude_agent_sdk._internal.client.process_query`` where ``end_input()`` is called immediately after sending the user message for string prompts. That premature stdin close kills the bidirectional control channel that SDK MCP servers need to handle ``tools/list`` and ``tools/call`` requests from the CLI subprocess, resulting in ``CLIConnectionError: ProcessTransport is not ready for writing``. Our fix: defer ``end_input()`` until after the first ``ResultMessage`` is received, matching the logic already present in ``Query.stream_input()`` for async-iterable prompts. """ import json as _json # Lazy imports from SDK internals. from claude_agent_sdk._internal.transport.subprocess_cli import ( SubprocessCLITransport, ) from claude_agent_sdk._internal.query import Query from claude_agent_sdk._internal.message_parser import parse_message # Build the prompt from the system prompt and conversation history. prompt = self._build_sdk_prompt(messages, system) options = self._build_agent_sdk_options() # Clean Claude session env vars so the child CLI process doesn't # detect a "nested session" and refuse to start. saved_env = self._clean_claude_env() try: # --- 1. Create and connect the subprocess transport. --- transport = SubprocessCLITransport(prompt=prompt, options=options) await transport.connect() # Track the subprocess PID for cleanup on exit if hasattr(transport, '_process') and transport._process: _register_claude_subprocess(transport._process.pid) # --- 2. Extract in-process SDK MCP server instances. --- sdk_mcp_servers: Dict = {} if options.mcp_servers and isinstance(options.mcp_servers, dict): for name, config in options.mcp_servers.items(): if isinstance(config, dict) and config.get("type") == "sdk": sdk_mcp_servers[name] = config["instance"] # --- 3. Create the Query object (control-protocol handler). --- query_obj = Query( transport=transport, is_streaming_mode=True, sdk_mcp_servers=sdk_mcp_servers, ) try: # Start the background reader task. await query_obj.start() # Perform the initialize handshake with the CLI. await query_obj.initialize() # Send the user message over stdin. user_msg = { "type": "user", "session_id": "", "message": {"role": "user", "content": prompt}, "parent_tool_use_id": None, } await transport.write(_json.dumps(user_msg) + "\n") # **KEY FIX**: Do NOT call end_input() yet. The CLI will # send MCP control requests (tools/list, tools/call) over # the bidirectional channel. Closing stdin now would # prevent us from writing responses back. We wait for the # first ResultMessage instead. # --- 4. Consume messages until we get a ResultMessage. --- result_text = "" message_count = 0 async for data in query_obj.receive_messages(): message = parse_message(data) message_count += 1 # Log all message types for debugging hangs message_type = type(message).__name__ logger.debug(f"[LLM] Received message #{message_count}: {message_type}") if isinstance(message, ResultMessage): result_text = message.result or "" logger.info( "[LLM] Agent SDK result received after %d messages: cost=$%.4f, turns=%s", message_count, getattr(message, "total_cost_usd", 0), getattr(message, "num_turns", "?"), ) break # Log non-result messages to detect loops if message_count % 10 == 0: logger.warning(f"[LLM] Still waiting for ResultMessage after {message_count} messages...") # Now that we have the result, close stdin gracefully. try: await transport.end_input() except Exception: pass # Process may have already exited; that's fine. return result_text finally: # Always clean up the query/transport. try: await query_obj.close() except Exception: # Suppress errors during cleanup (e.g. if process # already exited and there are pending control # request tasks that can't write back). pass finally: # Always restore env vars, even on error. self._restore_claude_env(saved_env) def _build_sdk_prompt( self, messages: List[Dict], system: Optional[str], ) -> str: """Build a prompt string for the Agent SDK query() from conversation history. The SDK expects a single prompt string. We combine the system prompt and conversation history into a coherent prompt. """ parts = [] if system: parts.append(f"\n{system}\n\n") # Include recent conversation history for context for msg in messages: content = msg.get("content", "") role = msg["role"] if isinstance(content, str): if role == "user": parts.append(f"User: {content}") elif role == "assistant": parts.append(f"Assistant: {content}") elif isinstance(content, list): # Structured content (tool_use/tool_result blocks from Direct API history) text_parts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_result": text_parts.append(f"[Tool result]: {block.get('content', '')}") elif block.get("type") == "tool_use": text_parts.append(f"[Used tool: {block.get('name', 'unknown')}]") elif hasattr(block, "type"): if block.type == "text": text_parts.append(block.text) if text_parts: if role == "user": parts.append(f"User: {' '.join(text_parts)}") elif role == "assistant": parts.append(f"Assistant: {' '.join(text_parts)}") return "\n\n".join(parts) def chat_with_tools( self, messages: List[Dict], tools: List[Dict[str, Any]], system: Optional[str] = None, max_tokens: int = 16384, use_cache: bool = False, ) -> Any: """Send chat request with tool support. In Agent SDK mode: Uses query() with MCP tools. The SDK handles tool execution automatically. Returns a string (final response after all tool calls are resolved). In Direct API mode: Returns an anthropic Message object with potential tool_use blocks that agent.py processes in a manual loop. Args: tools: Tool definitions (used by Direct API; ignored in Agent SDK mode since tools are registered via MCP servers). use_cache: Enable prompt caching for Sonnet (Direct API only). """ if self.provider != "claude": raise ValueError("Tool use only supported for Claude provider") if self.mode == "agent_sdk": # Agent SDK handles tool calls automatically via MCP servers. # We use the same query() path as chat(), since MCP tools are # already registered. The SDK will invoke tools, collect results, # and return the final text response. try: logger.info("[LLM] chat_with_tools: dispatching via Agent SDK") response = self._run_async_from_thread( self._agent_sdk_chat(messages, system, max_tokens) ) return response except Exception as e: logger.error("[LLM] Agent SDK error: %s", e, exc_info=True) raise Exception(f"Agent SDK error: {e}") elif self.mode == "direct_api": enable_caching = use_cache and "sonnet" in self.model.lower() if enable_caching and system: system_blocks = [ { "type": "text", "text": system, "cache_control": {"type": "ephemeral"} } ] else: system_blocks = system or "" response = self.client.messages.create( model=self.model, max_tokens=max_tokens, system=system_blocks, messages=messages, tools=tools, ) if self.tracker and hasattr(response, "usage"): self.tracker.track( model=self.model, input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, cache_creation_tokens=getattr( response.usage, "cache_creation_input_tokens", 0 ), cache_read_tokens=getattr( response.usage, "cache_read_input_tokens", 0 ), ) return response def set_model(self, model: str) -> None: """Change the active model.""" self.model = model def get_usage_stats(self, target_date: Optional[str] = None) -> Dict: """Get usage statistics and costs. Args: target_date: Date string (YYYY-MM-DD). If None, returns today's stats. Returns: Dict with cost, token counts, and breakdown by model. """ if not self.tracker: return {"error": "Usage tracking not enabled"} return self.tracker.get_daily_cost(target_date)