Features: - Usage tracking system (usage_tracker.py) - Tracks input/output tokens per API call - Calculates costs with support for cache pricing - Stores data in usage_data.json (gitignored) - Integrated into llm_interface.py - Dynamic task scheduler reloading - Auto-detects YAML changes every 60s - No restart needed for new tasks - reload_tasks() method for manual refresh - Example cost tracking scheduled task - Daily API usage report - Budget tracking ($5/month target) - Disabled by default in scheduled_tasks.yaml Improvements: - Fixed tool_use/tool_result pair splitting bug (CRITICAL) - Added thread safety to agent.chat() - Fixed N+1 query problem in hybrid search - Optimized database batch queries - Added conversation history pruning (50 messages max) Updated .gitignore: - Exclude user profiles (memory_workspace/users/*.md) - Exclude usage data (usage_data.json) - Exclude vector index (vectors.usearch) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
295 lines
9.2 KiB
Python
295 lines
9.2 KiB
Python
"""
|
|
Slack Socket Mode adapter for ajarbot.
|
|
|
|
Uses Socket Mode for easy firewall-free integration without webhooks.
|
|
"""
|
|
|
|
import re
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from slack_bolt.adapter.socket_mode.async_handler import (
|
|
AsyncSocketModeHandler,
|
|
)
|
|
from slack_bolt.async_app import AsyncApp
|
|
from slack_sdk.errors import SlackApiError
|
|
|
|
from adapters.base import (
|
|
AdapterCapabilities,
|
|
AdapterConfig,
|
|
BaseAdapter,
|
|
InboundMessage,
|
|
MessageType,
|
|
OutboundMessage,
|
|
)
|
|
|
|
|
|
class SlackAdapter(BaseAdapter):
|
|
"""
|
|
Slack adapter using Socket Mode.
|
|
|
|
Socket Mode allows receiving events over WebSocket without exposing
|
|
a public HTTP endpoint - perfect for development and simple deployments.
|
|
|
|
Configuration required:
|
|
- bot_token: Bot User OAuth Token (xoxb-...)
|
|
- app_token: App-Level Token (xapp-...)
|
|
"""
|
|
|
|
def __init__(self, config: AdapterConfig) -> None:
|
|
super().__init__(config)
|
|
self.app: Optional[AsyncApp] = None
|
|
self.handler: Optional[AsyncSocketModeHandler] = None
|
|
self._username_cache: Dict[str, str] = {} # user_id -> username
|
|
|
|
@property
|
|
def platform_name(self) -> str:
|
|
return "slack"
|
|
|
|
@property
|
|
def capabilities(self) -> AdapterCapabilities:
|
|
return AdapterCapabilities(
|
|
supports_threads=True,
|
|
supports_reactions=True,
|
|
supports_media=True,
|
|
supports_files=True,
|
|
supports_markdown=True,
|
|
max_message_length=4000,
|
|
chunking_strategy="word",
|
|
)
|
|
|
|
def validate_config(self) -> bool:
|
|
"""Validate Slack configuration."""
|
|
if not self.config.credentials:
|
|
return False
|
|
|
|
bot_token = self.config.credentials.get("bot_token", "")
|
|
app_token = self.config.credentials.get("app_token", "")
|
|
|
|
return (
|
|
bool(bot_token and app_token)
|
|
and bot_token.startswith("xoxb-")
|
|
and app_token.startswith("xapp-")
|
|
)
|
|
|
|
async def start(self) -> None:
|
|
"""Start the Slack Socket Mode connection."""
|
|
if not self.validate_config():
|
|
raise ValueError(
|
|
"Invalid Slack configuration. "
|
|
"Need bot_token (xoxb-...) and app_token (xapp-...)"
|
|
)
|
|
|
|
bot_token = self.config.credentials["bot_token"]
|
|
app_token = self.config.credentials["app_token"]
|
|
|
|
self.app = AsyncApp(token=bot_token)
|
|
self._register_handlers()
|
|
|
|
self.handler = AsyncSocketModeHandler(self.app, app_token)
|
|
|
|
print("[Slack] Starting Socket Mode connection...")
|
|
await self.handler.start_async()
|
|
|
|
self.is_running = True
|
|
print("[Slack] Connected and listening for messages")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the Slack Socket Mode connection."""
|
|
if self.handler:
|
|
print("[Slack] Stopping Socket Mode connection...")
|
|
await self.handler.close_async()
|
|
self.is_running = False
|
|
print("[Slack] Disconnected")
|
|
|
|
def _register_handlers(self) -> None:
|
|
"""Register Slack event handlers."""
|
|
|
|
@self.app.event("message")
|
|
async def handle_message_events(event, say):
|
|
"""Handle incoming messages."""
|
|
if event.get("subtype") in ["bot_message", "message_changed"]:
|
|
return
|
|
|
|
user_id = event.get("user")
|
|
text = event.get("text", "")
|
|
channel = event.get("channel")
|
|
thread_ts = event.get("thread_ts")
|
|
ts = event.get("ts")
|
|
|
|
username = await self._get_username(user_id)
|
|
|
|
inbound_msg = InboundMessage(
|
|
platform="slack",
|
|
user_id=user_id,
|
|
username=username,
|
|
text=text,
|
|
channel_id=channel,
|
|
thread_id=thread_ts,
|
|
reply_to_id=None,
|
|
message_type=MessageType.TEXT,
|
|
metadata={
|
|
"ts": ts,
|
|
"team": event.get("team"),
|
|
"channel_type": event.get("channel_type"),
|
|
},
|
|
raw=event,
|
|
)
|
|
|
|
self._dispatch_message(inbound_msg)
|
|
|
|
@self.app.event("app_mention")
|
|
async def handle_app_mentions(event, say):
|
|
"""Handle @mentions of the bot."""
|
|
user_id = event.get("user")
|
|
text = self._strip_mention(event.get("text", ""))
|
|
channel = event.get("channel")
|
|
thread_ts = event.get("thread_ts")
|
|
ts = event.get("ts")
|
|
|
|
username = await self._get_username(user_id)
|
|
|
|
inbound_msg = InboundMessage(
|
|
platform="slack",
|
|
user_id=user_id,
|
|
username=username,
|
|
text=text,
|
|
channel_id=channel,
|
|
thread_id=thread_ts,
|
|
reply_to_id=None,
|
|
message_type=MessageType.TEXT,
|
|
metadata={
|
|
"ts": ts,
|
|
"mentioned": True,
|
|
"team": event.get("team"),
|
|
},
|
|
raw=event,
|
|
)
|
|
|
|
self._dispatch_message(inbound_msg)
|
|
|
|
async def send_message(
|
|
self, message: OutboundMessage
|
|
) -> Dict[str, Any]:
|
|
"""Send a message to Slack."""
|
|
if not self.app:
|
|
return {"success": False, "error": "Adapter not started"}
|
|
|
|
try:
|
|
chunks = self.chunk_text(message.text)
|
|
results: List[Dict[str, Any]] = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
thread_ts = (
|
|
message.thread_id
|
|
if i == 0
|
|
else results[0].get("ts")
|
|
)
|
|
|
|
result = await self.app.client.chat_postMessage(
|
|
channel=message.channel_id,
|
|
text=chunk,
|
|
thread_ts=thread_ts,
|
|
mrkdwn=True,
|
|
)
|
|
|
|
results.append({
|
|
"ts": result["ts"],
|
|
"channel": result["channel"],
|
|
})
|
|
|
|
return {
|
|
"success": True,
|
|
"message_id": results[0]["ts"],
|
|
"chunks_sent": len(chunks),
|
|
"results": results,
|
|
}
|
|
|
|
except SlackApiError as e:
|
|
error_msg = e.response["error"]
|
|
print(f"[Slack] Error sending message: {error_msg}")
|
|
return {"success": False, "error": error_msg}
|
|
|
|
async def send_reaction(
|
|
self, channel_id: str, message_id: str, emoji: str
|
|
) -> bool:
|
|
"""Add a reaction to a message."""
|
|
if not self.app:
|
|
return False
|
|
|
|
try:
|
|
await self.app.client.reactions_add(
|
|
channel=channel_id,
|
|
timestamp=message_id,
|
|
name=emoji.strip(":"),
|
|
)
|
|
return True
|
|
except SlackApiError as e:
|
|
print(
|
|
f"[Slack] Error adding reaction: {e.response['error']}"
|
|
)
|
|
return False
|
|
|
|
async def send_typing_indicator(self, channel_id: str) -> None:
|
|
"""Slack doesn't have a typing indicator API."""
|
|
|
|
async def health_check(self) -> Dict[str, Any]:
|
|
"""Perform health check."""
|
|
base_health = await super().health_check()
|
|
|
|
if not self.app:
|
|
return {**base_health, "details": "App not initialized"}
|
|
|
|
try:
|
|
response = await self.app.client.auth_test()
|
|
return {
|
|
**base_health,
|
|
"bot_id": response.get("bot_id"),
|
|
"team": response.get("team"),
|
|
"user": response.get("user"),
|
|
"connected": True,
|
|
}
|
|
except SlackApiError as e:
|
|
return {
|
|
**base_health,
|
|
"healthy": False,
|
|
"error": str(e.response.get("error")),
|
|
}
|
|
|
|
async def _get_username(self, user_id: str) -> str:
|
|
"""Get username from user ID, with caching to avoid excessive API calls.
|
|
|
|
Sanitizes the returned username to contain only alphanumeric,
|
|
hyphens, and underscores (matching memory_system validation rules).
|
|
"""
|
|
# Check cache first
|
|
if user_id in self._username_cache:
|
|
return self._username_cache[user_id]
|
|
|
|
if not self.app:
|
|
return user_id
|
|
|
|
try:
|
|
result = await self.app.client.users_info(user=user_id)
|
|
user = result["user"]
|
|
profile = user.get("profile", {})
|
|
raw_username = (
|
|
profile.get("display_name")
|
|
or profile.get("real_name")
|
|
or user.get("name")
|
|
or user_id
|
|
)
|
|
# Sanitize: replace spaces/special chars with underscores
|
|
sanitized = "".join(
|
|
c if c.isalnum() or c in "-_" else "_" for c in raw_username
|
|
)
|
|
self._username_cache[user_id] = sanitized
|
|
return sanitized
|
|
except SlackApiError:
|
|
self._username_cache[user_id] = user_id
|
|
return user_id
|
|
|
|
@staticmethod
|
|
def _strip_mention(text: str) -> str:
|
|
"""Remove bot mention from text (e.g., '<@U12345> hello' -> 'hello')."""
|
|
return re.sub(r"<@[A-Z0-9]+>", "", text).strip()
|