Files

259 lines
7.7 KiB
Python
Raw Permalink Normal View History

"""
Base adapter interface for messaging platforms.
Inspired by OpenClaw's ChannelPlugin architecture but simplified
for ajarbot's needs.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
class MessageType(Enum):
"""Types of messages that can be sent or received."""
TEXT = "text"
MEDIA = "media"
FILE = "file"
REACTION = "reaction"
@dataclass
class InboundMessage:
"""Represents a message received from a messaging platform."""
platform: str
user_id: str
username: str
text: str
channel_id: str
thread_id: Optional[str]
reply_to_id: Optional[str]
message_type: MessageType
metadata: Dict[str, Any]
raw: Any
@dataclass
class OutboundMessage:
"""Represents a message to be sent to a messaging platform."""
platform: str
channel_id: str
text: str
thread_id: Optional[str] = None
reply_to_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AdapterConfig:
"""Configuration for an adapter instance."""
platform: str
enabled: bool = True
credentials: Dict[str, Any] = field(default_factory=dict)
settings: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AdapterCapabilities:
"""Describes what a messaging platform adapter can do."""
supports_threads: bool = False
supports_reactions: bool = False
supports_media: bool = False
supports_files: bool = False
supports_markdown: bool = False
max_message_length: int = 2000
chunking_strategy: Optional[str] = None # "word", "markdown", "char"
class BaseAdapter(ABC):
"""
Base adapter interface for messaging platforms.
Core aspects:
- Config: Platform configuration and credentials
- Gateway: Connection lifecycle management
- Outbound: Sending messages
- Inbound: Receiving and parsing messages
- Status: Health checks and monitoring
"""
def __init__(self, config: AdapterConfig) -> None:
self.config = config
self.is_running = False
self._message_handlers: List[Callable[[InboundMessage], None]] = []
# --- Core Interface (Required) ---
@property
@abstractmethod
def platform_name(self) -> str:
"""Platform identifier (e.g., 'slack', 'telegram')."""
@property
@abstractmethod
def capabilities(self) -> AdapterCapabilities:
"""Describe platform capabilities."""
@abstractmethod
async def start(self) -> None:
"""Start the adapter connection."""
@abstractmethod
async def stop(self) -> None:
"""Stop the adapter connection."""
@abstractmethod
async def send_message(
self, message: OutboundMessage
) -> Dict[str, Any]:
"""
Send a message to the platform.
Returns:
Dict with at least {"success": bool, "message_id": str}
"""
@abstractmethod
def validate_config(self) -> bool:
"""Validate that the adapter is properly configured."""
# --- Message Handler Registration ---
def register_message_handler(
self, handler: Callable[[InboundMessage], None]
) -> None:
"""Register a function to be called when messages are received."""
self._message_handlers.append(handler)
def _dispatch_message(self, message: InboundMessage) -> None:
"""Internal: Dispatch incoming message to all registered handlers."""
for handler in self._message_handlers:
try:
handler(message)
except Exception as e:
print(f"Error in message handler: {e}")
# --- Optional Features (Can be overridden) ---
async def send_reaction(
self, channel_id: str, message_id: str, emoji: str
) -> bool:
"""Send a reaction/emoji to a message. Optional."""
return False
async def send_typing_indicator(self, channel_id: str) -> None:
"""Show typing indicator. Optional."""
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on the adapter."""
return {
"platform": self.platform_name,
"running": self.is_running,
"healthy": self.is_running and self.validate_config(),
}
def chunk_text(self, text: str) -> List[str]:
"""Split long text into chunks based on platform limits."""
max_len = self.capabilities.max_message_length
if len(text) <= max_len:
return [text]
strategy = self.capabilities.chunking_strategy or "word"
if strategy == "word":
return self._chunk_by_words(text, max_len)
elif strategy == "char":
return self._chunk_by_chars(text, max_len)
elif strategy == "markdown":
return self._chunk_by_lines(text, max_len)
return [text]
@staticmethod
def _chunk_by_words(text: str, max_len: int) -> List[str]:
"""Split text on word boundaries."""
words = text.split()
chunks: List[str] = []
current_chunk: List[str] = []
current_length = 0
for word in words:
word_length = len(word) + 1 # +1 for space
if current_length + word_length > max_len:
chunks.append(" ".join(current_chunk))
current_chunk = [word]
current_length = word_length
else:
current_chunk.append(word)
current_length += word_length
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
@staticmethod
def _chunk_by_chars(text: str, max_len: int) -> List[str]:
"""Split text at fixed character boundaries."""
return [text[i:i + max_len] for i in range(0, len(text), max_len)]
@staticmethod
def _chunk_by_lines(text: str, max_len: int) -> List[str]:
"""Split text on line boundaries preserving markdown."""
lines = text.split("\n")
chunks: List[str] = []
current_chunk: List[str] = []
current_length = 0
for line in lines:
line_length = len(line) + 1 # +1 for newline
if current_length + line_length > max_len:
chunks.append("\n".join(current_chunk))
current_chunk = [line]
current_length = line_length
else:
current_chunk.append(line)
current_length += line_length
if current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
class AdapterRegistry:
"""Registry for managing multiple platform adapters."""
def __init__(self) -> None:
self._adapters: Dict[str, BaseAdapter] = {}
def register(self, adapter: BaseAdapter) -> None:
"""Register an adapter instance."""
self._adapters[adapter.platform_name] = adapter
def get(self, platform_name: str) -> Optional[BaseAdapter]:
"""Get an adapter by platform name."""
return self._adapters.get(platform_name)
def list_platforms(self) -> List[str]:
"""List all registered platform names."""
return list(self._adapters.keys())
def get_all(self) -> List[BaseAdapter]:
"""Get all registered adapters."""
return list(self._adapters.values())
async def start_all(self) -> None:
"""Start all registered adapters."""
for adapter in self._adapters.values():
if adapter.config.enabled:
await adapter.start()
async def stop_all(self) -> None:
"""Stop all registered adapters."""
for adapter in self._adapters.values():
if adapter.is_running:
await adapter.stop()