"""SSH MCP Server for remote command execution via SSH. Provides SSH access to remote hosts for the bot. """ import asyncio from typing import Any, Dict try: import paramiko PARAMIKO_AVAILABLE = True except ImportError: PARAMIKO_AVAILABLE = False from claude_agent_sdk import tool, create_sdk_mcp_server @tool( name="ssh_execute", description="Execute a command on a remote host via SSH. Returns stdout, stderr, and exit code.", input_schema={ "host": str, "username": str, "password": str, "key_filename": str, "command": str, "port": int, }, ) async def ssh_execute(args: Dict[str, Any]) -> Dict[str, Any]: """Execute a command on a remote host via SSH.""" if not PARAMIKO_AVAILABLE: return { "content": [{"type": "text", "text": "Error: paramiko not installed. Run: pip install paramiko"}], "isError": True } host = args.get("host") username = args.get("username") password = args.get("password") key_filename = args.get("key_filename") command = args.get("command") port = args.get("port", 22) if not all([host, username, command]): return { "content": [{"type": "text", "text": "Error: Missing required parameters: host, username, command"}], "isError": True } if not password and not key_filename: return { "content": [{"type": "text", "text": "Error: Must provide either password or key_filename for authentication"}], "isError": True } try: # Run SSH command in thread pool to avoid blocking loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, _execute_ssh_sync, host, port, username, password, key_filename, command ) # Format result as MCP-compliant text content if result["success"]: output_parts = [f"SSH command executed on {result['host']} (auth: {result['auth_method']})"] output_parts.append(f"Exit code: {result['exit_code']}") if result["stdout"]: stdout = result["stdout"] if len(stdout) > 5000: stdout = stdout[:5000] + "\n... (stdout truncated)" output_parts.append(f"\nSTDOUT:\n{stdout}") if result["stderr"]: stderr = result["stderr"] if len(stderr) > 5000: stderr = stderr[:5000] + "\n... (stderr truncated)" output_parts.append(f"\nSTDERR:\n{stderr}") if not result["stdout"] and not result["stderr"]: output_parts.append("\n(no output)") return { "content": [{"type": "text", "text": "\n".join(output_parts)}], "isError": result["exit_code"] != 0 } else: return { "content": [{"type": "text", "text": f"SSH Error: {result['error']}"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"SSH execution failed: {str(e)}"}], "isError": True } def _execute_ssh_sync(host: str, port: int, username: str, password: str, key_filename: str, command: str) -> Dict[str, Any]: """Synchronous SSH execution (runs in thread pool).""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: # Build connection parameters connect_kwargs = { "hostname": host, "port": port, "username": username, "timeout": 30, } # Use key-based auth if key_filename provided, otherwise use password if key_filename: connect_kwargs["key_filename"] = key_filename else: connect_kwargs["password"] = password client.connect(**connect_kwargs) stdin, stdout, stderr = client.exec_command(command) stdout_text = stdout.read().decode('utf-8') stderr_text = stderr.read().decode('utf-8') exit_code = stdout.channel.recv_exit_status() return { "success": True, "stdout": stdout_text, "stderr": stderr_text, "exit_code": exit_code, "host": host, "auth_method": "key" if key_filename else "password", } finally: client.close() @tool( name="ssh_file_upload", description="Upload a file to a remote host via SFTP. Returns success status and file paths.", input_schema={ "host": str, "username": str, "password": str, "key_filename": str, "local_path": str, "remote_path": str, "port": int, }, ) async def ssh_file_upload(args: Dict[str, Any]) -> Dict[str, Any]: """Upload a file to a remote host via SFTP.""" if not PARAMIKO_AVAILABLE: return { "content": [{"type": "text", "text": "Error: paramiko not installed. Run: pip install paramiko"}], "isError": True } host = args.get("host") username = args.get("username") password = args.get("password") key_filename = args.get("key_filename") local_path = args.get("local_path") remote_path = args.get("remote_path") port = args.get("port", 22) if not all([host, username, local_path, remote_path]): return { "content": [{"type": "text", "text": "Error: Missing required parameters: host, username, local_path, remote_path"}], "isError": True } if not password and not key_filename: return { "content": [{"type": "text", "text": "Error: Must provide either password or key_filename for authentication"}], "isError": True } try: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, _upload_file_sync, host, port, username, password, key_filename, local_path, remote_path ) # Format result as MCP-compliant text content if result["success"]: text = ( f"File uploaded successfully via SFTP\n" f"Host: {result['host']}\n" f"Auth: {result['auth_method']}\n" f"Local: {result['local_path']}\n" f"Remote: {result['remote_path']}" ) return { "content": [{"type": "text", "text": text}] } else: return { "content": [{"type": "text", "text": f"SFTP Error: {result['error']}"}], "isError": True } except Exception as e: return { "content": [{"type": "text", "text": f"SFTP upload failed: {str(e)}"}], "isError": True } def _upload_file_sync(host: str, port: int, username: str, password: str, key_filename: str, local_path: str, remote_path: str) -> Dict[str, Any]: """Synchronous SFTP upload (runs in thread pool).""" transport = paramiko.Transport((host, port)) try: # Use key-based auth if key_filename provided, otherwise use password if key_filename: private_key = paramiko.RSAKey.from_private_key_file(key_filename) transport.connect(username=username, pkey=private_key) else: transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) sftp.put(local_path, remote_path) return { "success": True, "local_path": local_path, "remote_path": remote_path, "host": host, "auth_method": "key" if key_filename else "password", } finally: sftp.close() if 'sftp' in locals() else None transport.close() # Create the MCP server ssh_mcp_server = create_sdk_mcp_server( name="ssh", version="1.0.0", tools=[ssh_execute, ssh_file_upload], )