""" Advanced Scheduled Tasks System with Agent/LLM integration. Supports cron-like scheduling for tasks that require the Agent to execute, with output delivery to messaging platforms (Slack, Telegram, etc.). Example use cases: - Daily weather reports at 8am and 6pm - Weekly summary on Friday at 5pm - Hourly health checks - Custom periodic agent tasks """ import asyncio import threading import traceback from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable, Dict, List, Optional import yaml from agent import Agent # Mapping of day abbreviations to weekday numbers (Monday=0) _DAY_NAMES = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] # Scheduler polling interval in seconds _SCHEDULER_POLL_INTERVAL = 60 @dataclass class ScheduledTask: """Defines a scheduled task that uses the Agent.""" name: str prompt: str schedule: str # "daily 08:00", "hourly", "weekly mon 09:00" enabled: bool = True username: str = "scheduler" # Optional: Send output to messaging platform send_to_platform: Optional[str] = None send_to_channel: Optional[str] = None # Tracking last_run: Optional[datetime] = None next_run: Optional[datetime] = None class TaskScheduler: """ Manages scheduled tasks that require Agent/LLM execution. Unlike the simple heartbeat, this: - Supports cron-like scheduling (specific times) - Can send outputs to messaging platforms - Tracks task execution history - Allows dynamic task management """ def __init__( self, agent: Agent, config_file: Optional[str] = None, ) -> None: self.agent = agent self.config_file = Path( config_file or "config/scheduled_tasks.yaml" ) self.tasks: List[ScheduledTask] = [] self.running = False self.thread: Optional[threading.Thread] = None # Adapter integration (set by runtime) self.adapters: Dict[str, Any] = {} self.on_task_complete: Optional[ Callable[[ScheduledTask, str], None] ] = None # Track file modification time for auto-reload self._last_mtime: Optional[float] = None self._load_tasks() def _load_tasks(self) -> None: """Load scheduled tasks from YAML config.""" if not self.config_file.exists(): self._create_default_config() return # Track file modification time self._last_mtime = self.config_file.stat().st_mtime with open(self.config_file, encoding="utf-8") as f: config = yaml.safe_load(f) or {} self.tasks.clear() # Clear existing tasks before reload for task_config in config.get("tasks", []): task = ScheduledTask( name=task_config["name"], prompt=task_config["prompt"], schedule=task_config["schedule"], enabled=task_config.get("enabled", True), username=task_config.get("username", "scheduler"), send_to_platform=task_config.get("send_to_platform"), send_to_channel=task_config.get("send_to_channel"), ) task.next_run = self._calculate_next_run(task.schedule) self.tasks.append(task) print(f"[Scheduler] Loaded {len(self.tasks)} task(s)") def _create_default_config(self) -> None: """Create default scheduled tasks config.""" default_config = { "tasks": [ { "name": "morning-briefing", "prompt": ( "Good morning! Please provide a brief summary " "of:\n1. Any pending tasks\n" "2. Today's priorities\n" "3. A motivational message to start the day" ), "schedule": "daily 08:00", "enabled": False, "send_to_platform": None, "send_to_channel": None, }, { "name": "evening-summary", "prompt": ( "Good evening! Please provide:\n" "1. Summary of what was accomplished today\n" "2. Any tasks still pending\n" "3. Preview of tomorrow's priorities" ), "schedule": "daily 18:00", "enabled": False, }, { "name": "weekly-review", "prompt": ( "It's the end of the week! Please provide:\n" "1. Week highlights and accomplishments\n" "2. Lessons learned\n" "3. Goals for next week" ), "schedule": "weekly fri 17:00", "enabled": False, }, ] } self.config_file.parent.mkdir(parents=True, exist_ok=True) with open(self.config_file, "w", encoding="utf-8") as f: yaml.dump( default_config, f, default_flow_style=False, sort_keys=False, ) print(f"[Scheduler] Created default config at {self.config_file}") def _calculate_next_run(self, schedule: str) -> datetime: """Calculate next run time from schedule string.""" now = datetime.now() parts = schedule.lower().split() if parts[0] == "hourly": return now.replace( minute=0, second=0, microsecond=0 ) + timedelta(hours=1) if parts[0] == "daily": if len(parts) < 2: raise ValueError(f"Invalid schedule: {schedule}") hour, minute = map(int, parts[1].split(":")) next_run = now.replace( hour=hour, minute=minute, second=0, microsecond=0 ) if next_run <= now: next_run += timedelta(days=1) return next_run if parts[0] == "weekly": if len(parts) < 3: raise ValueError(f"Invalid schedule: {schedule}") target_day = _DAY_NAMES.index(parts[1]) hour, minute = map(int, parts[2].split(":")) days_ahead = target_day - now.weekday() if days_ahead < 0: days_ahead += 7 next_run = now + timedelta(days=days_ahead) next_run = next_run.replace( hour=hour, minute=minute, second=0, microsecond=0 ) # If same day but time already passed, advance to next week if next_run <= now: next_run += timedelta(days=7) return next_run raise ValueError(f"Unknown schedule format: {schedule}") def reload_tasks(self) -> bool: """Reload tasks from config file if it has changed. Returns: True if tasks were reloaded, False if no changes detected. """ if not self.config_file.exists(): return False current_mtime = self.config_file.stat().st_mtime if self._last_mtime is not None and current_mtime == self._last_mtime: return False print(f"[Scheduler] Config file changed, reloading tasks...") self._load_tasks() if self.running: print("[Scheduler] Updated task schedule:") for task in self.tasks: if task.enabled and task.next_run: formatted = task.next_run.strftime("%Y-%m-%d %H:%M") print(f" - {task.name}: next run at {formatted}") return True def add_adapter(self, platform: str, adapter: Any) -> None: """Register an adapter for sending task outputs.""" self.adapters[platform] = adapter print(f"[Scheduler] Registered adapter: {platform}") def start(self) -> None: """Start the scheduler in a background thread.""" if self.running: return self.running = True self.thread = threading.Thread( target=self._run_scheduler_loop, daemon=True ) self.thread.start() print(f"[Scheduler] Started with {len(self.tasks)} task(s)") for task in self.tasks: if task.enabled and task.next_run: formatted = task.next_run.strftime("%Y-%m-%d %H:%M") print( f" - {task.name}: next run at {formatted}" ) def stop(self) -> None: """Stop the scheduler.""" self.running = False if self.thread: self.thread.join() print("[Scheduler] Stopped") def _run_scheduler_loop(self) -> None: """Main scheduler loop (runs in background thread).""" while self.running: try: # Auto-reload tasks if config file changed self.reload_tasks() now = datetime.now() for task in self.tasks: if not task.enabled: continue if task.next_run and now >= task.next_run: print( f"[Scheduler] Executing task: {task.name}" ) threading.Thread( target=self._execute_task, args=(task,), daemon=True, ).start() task.last_run = now task.next_run = self._calculate_next_run( task.schedule ) formatted = task.next_run.strftime( "%Y-%m-%d %H:%M" ) print( f"[Scheduler] Next run for {task.name}: " f"{formatted}" ) except Exception as e: print(f"[Scheduler] Error in scheduler loop: {e}") traceback.print_exc() threading.Event().wait(_SCHEDULER_POLL_INTERVAL) def _execute_task(self, task: ScheduledTask) -> None: """Execute a single task using the Agent. Note: agent.chat() is thread-safe (uses internal lock), so this can safely run from the scheduler's background thread. """ try: print(f"[Scheduler] Running: {task.name}") response = self.agent.chat( user_message=task.prompt, username=task.username, ) print(f"[Scheduler] Task completed: {task.name}") print(f" Response: {response[:100]}...") if task.send_to_platform and task.send_to_channel: # Use the running event loop if available, otherwise create one. # asyncio.run() fails if an event loop is already running # (which it is when the bot is active). try: loop = asyncio.get_running_loop() # Schedule on the existing loop from this background thread future = asyncio.run_coroutine_threadsafe( self._send_to_platform(task, response), loop ) future.result(timeout=30) # Wait up to 30s except RuntimeError: # No running loop (e.g., standalone test mode) asyncio.run(self._send_to_platform(task, response)) if self.on_task_complete: self.on_task_complete(task, response) except Exception as e: print(f"[Scheduler] Task failed: {task.name}") print(f" Error: {e}") traceback.print_exc() async def _send_to_platform( self, task: ScheduledTask, response: str ) -> None: """Send task output to messaging platform.""" adapter = self.adapters.get(task.send_to_platform) if not adapter: print( f"[Scheduler] Adapter not found: " f"{task.send_to_platform}" ) return from adapters.base import OutboundMessage message = OutboundMessage( platform=task.send_to_platform, channel_id=task.send_to_channel, text=f"**{task.name}**\n\n{response}", ) result = await adapter.send_message(message) if result.get("success"): print( f"[Scheduler] Sent to " f"{task.send_to_platform}:{task.send_to_channel}" ) else: print( f"[Scheduler] Failed to send: {result.get('error')}" ) def list_tasks(self) -> List[Dict[str, Any]]: """Get status of all tasks.""" result = [] for task in self.tasks: send_to = None if task.send_to_platform: send_to = ( f"{task.send_to_platform}:{task.send_to_channel}" ) result.append({ "name": task.name, "schedule": task.schedule, "enabled": task.enabled, "next_run": ( task.next_run.isoformat() if task.next_run else None ), "last_run": ( task.last_run.isoformat() if task.last_run else None ), "send_to": send_to, }) return result def run_task_now(self, task_name: str) -> str: """Manually trigger a task immediately.""" task = next( (t for t in self.tasks if t.name == task_name), None ) if not task: return f"Task not found: {task_name}" print(f"[Scheduler] Manual execution: {task_name}") self._execute_task(task) return f"Task '{task_name}' executed" def integrate_scheduler_with_runtime( runtime: Any, agent: Agent, config_file: Optional[str] = None, ) -> TaskScheduler: """ Integrate scheduled tasks with the bot runtime. Usage in bot_runner.py: scheduler = integrate_scheduler_with_runtime(runtime, agent) scheduler.start() """ scheduler = TaskScheduler(agent, config_file) for adapter in runtime.registry.get_all(): scheduler.add_adapter(adapter.platform_name, adapter) return scheduler if __name__ == "__main__": agent = Agent( provider="claude", workspace_dir="./memory_workspace" ) scheduler = TaskScheduler( agent, config_file="config/scheduled_tasks.yaml" ) print("\nScheduled tasks:") for task_info in scheduler.list_tasks(): print( f" {task_info['name']}: {task_info['schedule']} " f"(next: {task_info['next_run']})" ) if scheduler.tasks: test_task = scheduler.tasks[0] print(f"\nTesting task: {test_task.name}") scheduler.run_task_now(test_task.name)