Files
ajarbot/scheduled_tasks.py

474 lines
16 KiB
Python
Raw Permalink Normal View History

"""
Advanced Scheduled Tasks System with Agent/LLM integration.
Supports cron-like scheduling for tasks that require the Agent to execute,
with output delivery to messaging platforms (Slack, Telegram, etc.).
Example use cases:
- Daily weather reports at 8am and 6pm
- Weekly summary on Friday at 5pm
- Hourly health checks
- Custom periodic agent tasks
"""
import asyncio
import threading
import traceback
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import yaml
from agent import Agent
# Mapping of day abbreviations to weekday numbers (Monday=0)
_DAY_NAMES = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]
# Scheduler polling interval in seconds
_SCHEDULER_POLL_INTERVAL = 60
@dataclass
class ScheduledTask:
"""Defines a scheduled task that uses the Agent."""
name: str
prompt: str
schedule: str # "daily 08:00", "hourly", "weekly mon 09:00"
enabled: bool = True
username: str = "scheduler"
# Optional: Send output to messaging platform
send_to_platform: Optional[str] = None
send_to_channel: Optional[str] = None
# Tracking
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
class TaskScheduler:
"""
Manages scheduled tasks that require Agent/LLM execution.
Unlike the simple heartbeat, this:
- Supports cron-like scheduling (specific times)
- Can send outputs to messaging platforms
- Tracks task execution history
- Allows dynamic task management
"""
def __init__(
self,
agent: Agent,
config_file: Optional[str] = None,
) -> None:
self.agent = agent
self.config_file = Path(
config_file or "config/scheduled_tasks.yaml"
)
self.tasks: List[ScheduledTask] = []
self.running = False
self.thread: Optional[threading.Thread] = None
# Adapter integration (set by runtime)
self.adapters: Dict[str, Any] = {}
self.on_task_complete: Optional[
Callable[[ScheduledTask, str], None]
] = None
# Track file modification time for auto-reload
self._last_mtime: Optional[float] = None
self._load_tasks()
def _load_tasks(self) -> None:
"""Load scheduled tasks from YAML config."""
if not self.config_file.exists():
self._create_default_config()
return
# Track file modification time
self._last_mtime = self.config_file.stat().st_mtime
with open(self.config_file, encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
self.tasks.clear() # Clear existing tasks before reload
for task_config in config.get("tasks", []):
task = ScheduledTask(
name=task_config["name"],
prompt=task_config["prompt"],
schedule=task_config["schedule"],
enabled=task_config.get("enabled", True),
username=task_config.get("username", "scheduler"),
send_to_platform=task_config.get("send_to_platform"),
send_to_channel=task_config.get("send_to_channel"),
)
task.next_run = self._calculate_next_run(task.schedule)
self.tasks.append(task)
print(f"[Scheduler] Loaded {len(self.tasks)} task(s)")
def _create_default_config(self) -> None:
"""Create default scheduled tasks config."""
default_config = {
"tasks": [
{
"name": "morning-briefing",
"prompt": (
"Good morning! Please provide a brief summary "
"of:\n1. Any pending tasks\n"
"2. Today's priorities\n"
"3. A motivational message to start the day"
),
"schedule": "daily 08:00",
"enabled": False,
"send_to_platform": None,
"send_to_channel": None,
},
{
"name": "evening-summary",
"prompt": (
"Good evening! Please provide:\n"
"1. Summary of what was accomplished today\n"
"2. Any tasks still pending\n"
"3. Preview of tomorrow's priorities"
),
"schedule": "daily 18:00",
"enabled": False,
},
{
"name": "weekly-review",
"prompt": (
"It's the end of the week! Please provide:\n"
"1. Week highlights and accomplishments\n"
"2. Lessons learned\n"
"3. Goals for next week"
),
"schedule": "weekly fri 17:00",
"enabled": False,
},
]
}
self.config_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.config_file, "w", encoding="utf-8") as f:
yaml.dump(
default_config, f,
default_flow_style=False,
sort_keys=False,
)
print(f"[Scheduler] Created default config at {self.config_file}")
def _calculate_next_run(self, schedule: str) -> datetime:
"""Calculate next run time from schedule string."""
now = datetime.now()
parts = schedule.lower().split()
if parts[0] == "hourly":
return now.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
if parts[0] == "daily":
if len(parts) < 2:
raise ValueError(f"Invalid schedule: {schedule}")
hour, minute = map(int, parts[1].split(":"))
next_run = now.replace(
hour=hour, minute=minute, second=0, microsecond=0
)
if next_run <= now:
next_run += timedelta(days=1)
return next_run
if parts[0] == "weekly":
if len(parts) < 3:
raise ValueError(f"Invalid schedule: {schedule}")
target_day = _DAY_NAMES.index(parts[1])
hour, minute = map(int, parts[2].split(":"))
days_ahead = target_day - now.weekday()
if days_ahead < 0:
days_ahead += 7
next_run = now + timedelta(days=days_ahead)
next_run = next_run.replace(
hour=hour, minute=minute, second=0, microsecond=0
)
# If same day but time already passed, advance to next week
if next_run <= now:
next_run += timedelta(days=7)
return next_run
raise ValueError(f"Unknown schedule format: {schedule}")
def reload_tasks(self) -> bool:
"""Reload tasks from config file if it has changed.
Returns:
True if tasks were reloaded, False if no changes detected.
"""
if not self.config_file.exists():
return False
current_mtime = self.config_file.stat().st_mtime
if self._last_mtime is not None and current_mtime == self._last_mtime:
return False
print(f"[Scheduler] Config file changed, reloading tasks...")
self._load_tasks()
if self.running:
print("[Scheduler] Updated task schedule:")
for task in self.tasks:
if task.enabled and task.next_run:
formatted = task.next_run.strftime("%Y-%m-%d %H:%M")
print(f" - {task.name}: next run at {formatted}")
return True
def add_adapter(self, platform: str, adapter: Any) -> None:
"""Register an adapter for sending task outputs."""
self.adapters[platform] = adapter
print(f"[Scheduler] Registered adapter: {platform}")
def start(self) -> None:
"""Start the scheduler in a background thread."""
if self.running:
return
self.running = True
self.thread = threading.Thread(
target=self._run_scheduler_loop, daemon=True
)
self.thread.start()
print(f"[Scheduler] Started with {len(self.tasks)} task(s)")
for task in self.tasks:
if task.enabled and task.next_run:
formatted = task.next_run.strftime("%Y-%m-%d %H:%M")
print(
f" - {task.name}: next run at {formatted}"
)
def stop(self) -> None:
"""Stop the scheduler."""
self.running = False
if self.thread:
self.thread.join()
print("[Scheduler] Stopped")
def _run_scheduler_loop(self) -> None:
"""Main scheduler loop (runs in background thread)."""
while self.running:
try:
# Auto-reload tasks if config file changed
self.reload_tasks()
now = datetime.now()
for task in self.tasks:
if not task.enabled:
continue
if task.next_run and now >= task.next_run:
print(
f"[Scheduler] Executing task: {task.name}"
)
threading.Thread(
target=self._execute_task,
args=(task,),
daemon=True,
).start()
task.last_run = now
task.next_run = self._calculate_next_run(
task.schedule
)
formatted = task.next_run.strftime(
"%Y-%m-%d %H:%M"
)
print(
f"[Scheduler] Next run for {task.name}: "
f"{formatted}"
)
except Exception as e:
print(f"[Scheduler] Error in scheduler loop: {e}")
traceback.print_exc()
threading.Event().wait(_SCHEDULER_POLL_INTERVAL)
def _execute_task(self, task: ScheduledTask) -> None:
"""Execute a single task using the Agent.
Note: agent.chat() is thread-safe (uses internal lock), so this
can safely run from the scheduler's background thread.
"""
try:
print(f"[Scheduler] Running: {task.name}")
response = self.agent.chat(
user_message=task.prompt,
username=task.username,
)
print(f"[Scheduler] Task completed: {task.name}")
print(f" Response: {response[:100]}...")
if task.send_to_platform and task.send_to_channel:
# Use the running event loop if available, otherwise create one.
# asyncio.run() fails if an event loop is already running
# (which it is when the bot is active).
try:
loop = asyncio.get_running_loop()
# Schedule on the existing loop from this background thread
future = asyncio.run_coroutine_threadsafe(
self._send_to_platform(task, response), loop
)
future.result(timeout=30) # Wait up to 30s
except RuntimeError:
# No running loop (e.g., standalone test mode)
asyncio.run(self._send_to_platform(task, response))
if self.on_task_complete:
self.on_task_complete(task, response)
except Exception as e:
print(f"[Scheduler] Task failed: {task.name}")
print(f" Error: {e}")
traceback.print_exc()
if self.agent and hasattr(self.agent, 'healing_system'):
self.agent.healing_system.capture_error(
error=e,
component="scheduled_tasks.py:_execute_task",
intent=f"Executing scheduled task: {task.name}",
context={
"task_name": task.name,
"schedule": task.schedule,
"prompt": task.prompt[:100],
},
)
async def _send_to_platform(
self, task: ScheduledTask, response: str
) -> None:
"""Send task output to messaging platform."""
adapter = self.adapters.get(task.send_to_platform)
if not adapter:
print(
f"[Scheduler] Adapter not found: "
f"{task.send_to_platform}"
)
return
from adapters.base import OutboundMessage
message = OutboundMessage(
platform=task.send_to_platform,
channel_id=task.send_to_channel,
text=f"**{task.name}**\n\n{response}",
)
result = await adapter.send_message(message)
if result.get("success"):
print(
f"[Scheduler] Sent to "
f"{task.send_to_platform}:{task.send_to_channel}"
)
else:
print(
f"[Scheduler] Failed to send: {result.get('error')}"
)
def list_tasks(self) -> List[Dict[str, Any]]:
"""Get status of all tasks."""
result = []
for task in self.tasks:
send_to = None
if task.send_to_platform:
send_to = (
f"{task.send_to_platform}:{task.send_to_channel}"
)
result.append({
"name": task.name,
"schedule": task.schedule,
"enabled": task.enabled,
"next_run": (
task.next_run.isoformat()
if task.next_run
else None
),
"last_run": (
task.last_run.isoformat()
if task.last_run
else None
),
"send_to": send_to,
})
return result
def run_task_now(self, task_name: str) -> str:
"""Manually trigger a task immediately."""
task = next(
(t for t in self.tasks if t.name == task_name), None
)
if not task:
return f"Task not found: {task_name}"
print(f"[Scheduler] Manual execution: {task_name}")
self._execute_task(task)
return f"Task '{task_name}' executed"
def integrate_scheduler_with_runtime(
runtime: Any,
agent: Agent,
config_file: Optional[str] = None,
) -> TaskScheduler:
"""
Integrate scheduled tasks with the bot runtime.
Usage in bot_runner.py:
scheduler = integrate_scheduler_with_runtime(runtime, agent)
scheduler.start()
"""
scheduler = TaskScheduler(agent, config_file)
for adapter in runtime.registry.get_all():
scheduler.add_adapter(adapter.platform_name, adapter)
return scheduler
if __name__ == "__main__":
agent = Agent(
provider="claude", workspace_dir="./memory_workspace"
)
scheduler = TaskScheduler(
agent, config_file="config/scheduled_tasks.yaml"
)
print("\nScheduled tasks:")
for task_info in scheduler.list_tasks():
print(
f" {task_info['name']}: {task_info['schedule']} "
f"(next: {task_info['next_run']})"
)
if scheduler.tasks:
test_task = scheduler.tasks[0]
print(f"\nTesting task: {test_task.name}")
scheduler.run_task_now(test_task.name)