Refactor: Clean up obsolete files and organize codebase structure

This commit removes deprecated modules and reorganizes code into logical directories:

Deleted files (superseded by newer systems):
- claude_code_server.py (replaced by agent-sdk direct integration)
- heartbeat.py (superseded by scheduled_tasks.py)
- pulse_brain.py (unused in production)
- config/pulse_brain_config.py (obsolete config)

Created directory structure:
- examples/ (7 example files: example_*.py, demo_*.py)
- tests/ (5 test files: test_*.py)

Updated imports:
- agent.py: Removed heartbeat module and all enable_heartbeat logic
- bot_runner.py: Removed heartbeat parameter from Agent initialization
- llm_interface.py: Updated deprecated claude_code_server message

Preserved essential files:
- hooks.py (for future use)
- adapters/skill_integration.py (for future use)
- All Google integration tools (Gmail, Calendar, Contacts)
- GLM provider code (backward compatibility)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-15 09:57:39 -07:00
parent f018800d94
commit a8665d8c72
26 changed files with 1068 additions and 1067 deletions

View File

@@ -0,0 +1,53 @@
"""Compare old keyword search vs new hybrid search."""
from memory_system import MemorySystem
print("Initializing memory system...")
memory = MemorySystem()
print("\n" + "="*70)
print("KEYWORD vs HYBRID SEARCH COMPARISON")
print("="*70)
# Test queries that benefit from semantic understanding
test_queries = [
("How do I reduce costs?", "Testing semantic understanding of 'reduce costs' -> 'cost optimization'"),
("when was I born", "Testing semantic match for birthday/birth date"),
("what database do we use", "Testing keyword match for 'SQLite'"),
("vector similarity", "Testing technical term matching"),
]
for query, description in test_queries:
print(f"\n{description}")
print(f"Query: '{query}'")
print("-" * 70)
# Keyword-only search
print("\n KEYWORD SEARCH (old):")
keyword_results = memory.search(query, max_results=2)
if keyword_results:
for i, r in enumerate(keyword_results, 1):
print(f" {i}. {r['path']}:{r['start_line']} (score: {r['score']:.3f})")
print(f" {r['snippet'][:80]}...")
else:
print(" No results found!")
# Hybrid search
print("\n HYBRID SEARCH (new):")
hybrid_results = memory.search_hybrid(query, max_results=2)
if hybrid_results:
for i, r in enumerate(hybrid_results, 1):
print(f" {i}. {r['path']}:{r['start_line']} (score: {r['score']:.3f})")
print(f" {r['snippet'][:80]}...")
else:
print(" No results found!")
print()
print("\n" + "="*70)
print(f"[OK] Hybrid search loaded with {len(memory.vector_index)} vector embeddings")
print(f"[OK] Vector index: {memory.vector_index_path}")
print(f"[OK] Database: {memory.db_path}")
print("="*70)
memory.close()

View File

@@ -0,0 +1,75 @@
"""
Example: Using the adapter system programmatically.
Demonstrates how to integrate adapters into your own code,
rather than using the bot_runner.py CLI.
"""
import asyncio
from adapters.base import AdapterConfig
from adapters.runtime import (
AdapterRuntime,
command_preprocessor,
markdown_postprocessor,
)
from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter
from agent import Agent
async def main() -> None:
# 1. Create the agent
agent = Agent(
provider="claude",
workspace_dir="./memory_workspace",
enable_heartbeat=False,
)
# 2. Create runtime
runtime = AdapterRuntime(agent)
# 3. Add preprocessors and postprocessors
runtime.add_preprocessor(command_preprocessor)
runtime.add_postprocessor(markdown_postprocessor)
# 4. Configure Slack adapter
slack_adapter = SlackAdapter(AdapterConfig(
platform="slack",
enabled=True,
credentials={
"bot_token": "xoxb-YOUR-TOKEN",
"app_token": "xapp-YOUR-TOKEN",
},
))
runtime.add_adapter(slack_adapter)
# 5. Configure Telegram adapter
telegram_adapter = TelegramAdapter(AdapterConfig(
platform="telegram",
enabled=True,
credentials={"bot_token": "YOUR-TELEGRAM-TOKEN"},
settings={"parse_mode": "Markdown"},
))
runtime.add_adapter(telegram_adapter)
# 6. Map users (optional)
runtime.map_user("slack:U12345", "alice")
runtime.map_user("telegram:123456789", "alice")
# 7. Start runtime
await runtime.start()
# 8. Keep running
try:
print("Bot is running! Press Ctrl+C to stop.")
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nStopping...")
finally:
await runtime.stop()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,160 @@
"""
Example: Bot with Pulse & Brain monitoring.
The most cost-effective approach:
- Pulse checks run constantly (pure Python, $0 cost)
- Brain only invokes Agent when needed
- 92% cost savings vs always-on agent
Usage:
python example_bot_with_pulse_brain.py
"""
import asyncio
from adapters.base import AdapterConfig
from adapters.runtime import AdapterRuntime
from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter
from agent import Agent
from pulse_brain import PulseBrain
# Cost estimation constants
_AVERAGE_TOKENS_PER_CALL = 1000
_COST_PER_TOKEN = 0.000003
async def main() -> None:
print("=" * 60)
print("Ajarbot with Pulse & Brain")
print("=" * 60)
print("\nPulse: Pure Python checks (zero cost)")
print("Brain: Agent/SDK (only when needed)\n")
# 1. Create agent
agent = Agent(
provider="claude",
workspace_dir="./memory_workspace",
enable_heartbeat=False,
)
# 2. Create runtime with adapters
runtime = AdapterRuntime(agent)
slack_adapter = SlackAdapter(AdapterConfig(
platform="slack",
enabled=True,
credentials={
"bot_token": "xoxb-YOUR-TOKEN",
"app_token": "xapp-YOUR-TOKEN",
},
))
runtime.add_adapter(slack_adapter)
telegram_adapter = TelegramAdapter(AdapterConfig(
platform="telegram",
enabled=True,
credentials={"bot_token": "YOUR-TELEGRAM-TOKEN"},
))
runtime.add_adapter(telegram_adapter)
# 3. Create Pulse & Brain system
pb = PulseBrain(agent, pulse_interval=60)
pb.add_adapter("slack", slack_adapter)
pb.add_adapter("telegram", telegram_adapter)
# Optional: Apply custom configuration
try:
from config.pulse_brain_config import apply_custom_config
apply_custom_config(pb)
print("[Setup] Custom pulse/brain config loaded")
except ImportError:
print("[Setup] Using default pulse/brain config")
# 4. Show configuration
print("\n" + "=" * 60)
print("Configuration")
print("=" * 60)
print(f"\nPulse checks ({len(pb.pulse_checks)}):")
for check in pb.pulse_checks:
print(
f" [Pulse] {check.name} "
f"(every {check.interval_seconds}s)"
)
print(f"\nBrain tasks ({len(pb.brain_tasks)}):")
for task in pb.brain_tasks:
if task.check_type.value == "scheduled":
print(
f" [Brain] {task.name} "
f"(scheduled {task.schedule_time})"
)
else:
print(f" [Brain] {task.name} (conditional)")
# 5. Start everything
print("\n" + "=" * 60)
print("Starting system...")
print("=" * 60 + "\n")
await runtime.start()
pb.start()
print("\n" + "=" * 60)
print("System is running!")
print("=" * 60)
print("\nWhat's happening:")
print(" - Users can chat with bot on Slack/Telegram")
print(" - Pulse checks run every 60s (zero cost)")
print(" - Brain only invokes when:")
print(" - Error detected")
print(" - Threshold exceeded")
print(" - Scheduled time (8am, 6pm)")
print("\nPress Ctrl+C to stop.\n")
try:
iteration = 0
while True:
await asyncio.sleep(30)
iteration += 1
if iteration % 2 == 0:
status = pb.get_status()
invocations = status["brain_invocations"]
print(
f"[Status] Brain invoked "
f"{invocations} times"
)
except KeyboardInterrupt:
print("\n\n[Shutdown] Stopping system...")
finally:
pb.stop()
await runtime.stop()
final_status = pb.get_status()
invocations = final_status["brain_invocations"]
total_tokens = invocations * _AVERAGE_TOKENS_PER_CALL
estimated_cost = total_tokens * _COST_PER_TOKEN
print("\n" + "=" * 60)
print("Final Statistics")
print("=" * 60)
print(f"Brain invocations: {invocations}")
print(f"Estimated tokens: {total_tokens:,}")
print(f"Estimated cost: ${estimated_cost:.4f}")
print("\nCompare to always-on agent:")
print(" Pulse checks: FREE")
print(
f" Brain calls: {invocations} "
f"(only when needed)"
)
print(
" Savings: ~92% vs running agent every minute"
)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,109 @@
"""
Example: Bot with scheduled tasks.
Demonstrates how to use the TaskScheduler for cron-like scheduled tasks
that require Agent/LLM execution and can send outputs to messaging platforms.
"""
import asyncio
from adapters.base import AdapterConfig
from adapters.runtime import AdapterRuntime
from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter
from agent import Agent
from scheduled_tasks import ScheduledTask, TaskScheduler
def _on_task_complete(task: ScheduledTask, response: str) -> None:
"""Callback for task completion."""
print(f"\n[Task Complete] {task.name}")
print(f"Response preview: {response[:100]}...\n")
async def main() -> None:
print("=" * 60)
print("Ajarbot with Scheduled Tasks")
print("=" * 60)
# 1. Create agent
agent = Agent(
provider="claude",
workspace_dir="./memory_workspace",
enable_heartbeat=False,
)
# 2. Create runtime
runtime = AdapterRuntime(agent)
# 3. Add adapters
slack_adapter = SlackAdapter(AdapterConfig(
platform="slack",
enabled=True,
credentials={
"bot_token": "xoxb-YOUR-TOKEN",
"app_token": "xapp-YOUR-TOKEN",
},
))
runtime.add_adapter(slack_adapter)
telegram_adapter = TelegramAdapter(AdapterConfig(
platform="telegram",
enabled=True,
credentials={"bot_token": "YOUR-TELEGRAM-TOKEN"},
))
runtime.add_adapter(telegram_adapter)
# 4. Create and configure scheduler
scheduler = TaskScheduler(
agent, config_file="config/scheduled_tasks.yaml",
)
scheduler.add_adapter("slack", slack_adapter)
scheduler.add_adapter("telegram", telegram_adapter)
scheduler.on_task_complete = _on_task_complete
print("\n[Setup] Scheduled tasks:")
for task_info in scheduler.list_tasks():
enabled = task_info.get("enabled")
status = "enabled" if enabled else "disabled"
next_run = task_info.get("next_run", "N/A")
send_to = task_info.get("send_to") or "local only"
print(f" [{status}] {task_info['name']}")
print(f" Schedule: {task_info['schedule']}")
print(f" Next run: {next_run}")
print(f" Output: {send_to}")
# 5. Start everything
print("\n" + "=" * 60)
print("Starting bot with scheduler...")
print("=" * 60 + "\n")
await runtime.start()
scheduler.start()
print("\n" + "=" * 60)
print("Bot is running! Press Ctrl+C to stop.")
print("=" * 60)
print(
"\nScheduled tasks will run automatically "
"at their scheduled times."
)
print(
"Users can also chat with the bot normally "
"from Slack/Telegram.\n"
)
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n\n[Shutdown] Stopping...")
finally:
scheduler.stop()
await runtime.stop()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,174 @@
"""
Example: Bot runner with local skills support.
This demonstrates how to use local skills from .claude/skills/
in your bot, allowing users to invoke them from Slack/Telegram.
"""
import asyncio
from adapters.base import AdapterConfig, InboundMessage
from adapters.runtime import AdapterRuntime
from adapters.skill_integration import SkillInvoker
from adapters.slack.adapter import SlackAdapter
from adapters.telegram.adapter import TelegramAdapter
from agent import Agent
def create_skill_preprocessor(
skill_invoker: SkillInvoker, agent: Agent
) -> callable:
"""
Preprocessor that allows invoking skills via /skill-name syntax.
Example messages:
"/adapter-dev create Discord adapter"
"/help"
"/status check health of all adapters"
"""
def preprocessor(
message: InboundMessage,
) -> InboundMessage:
text = message.text.strip()
if not text.startswith("/"):
return message
parts = text.split(maxsplit=1)
skill_name = parts[0][1:]
args = parts[1] if len(parts) > 1 else ""
available_skills = skill_invoker.list_available_skills()
if skill_name in available_skills:
print(
f"[Skills] Invoking /{skill_name} "
f"with args: {args}"
)
skill_info = skill_invoker.get_skill_info(skill_name)
if skill_info:
skill_body = skill_info.get("body", "")
skill_context = skill_body.replace(
"$ARGUMENTS", args,
)
arg_parts = args.split() if args else []
for i, arg in enumerate(arg_parts):
skill_context = skill_context.replace(
f"${i}", arg,
)
message.text = skill_context
message.metadata["skill_invoked"] = skill_name
print(f"[Skills] Skill /{skill_name} loaded")
else:
message.text = (
f"Error: Could not load skill '{skill_name}'"
)
elif skill_name in ["help", "skills"]:
skills_list = "\n".join(
f" - /{s}" for s in available_skills
)
message.text = (
f"Available skills:\n{skills_list}\n\n"
f"Use /skill-name to invoke."
)
message.metadata["builtin_command"] = True
return message
return preprocessor
def create_skill_postprocessor() -> callable:
"""Postprocessor that adds skill metadata to responses."""
def postprocessor(
response: str, original: InboundMessage,
) -> str:
if original.metadata.get("skill_invoked"):
skill_name = original.metadata["skill_invoked"]
response += f"\n\n_[Powered by /{skill_name}]_"
return response
return postprocessor
async def main() -> None:
print("=" * 60)
print("Ajarbot with Local Skills")
print("=" * 60)
# 1. Create agent
agent = Agent(
provider="claude",
workspace_dir="./memory_workspace",
enable_heartbeat=False,
)
# 2. Initialize skill system
skill_invoker = SkillInvoker()
print("\n[Skills] Available local skills:")
for skill in skill_invoker.list_available_skills():
info = skill_invoker.get_skill_info(skill)
desc = (
info.get("description", "No description")
if info
else "Unknown"
)
print(f" - /{skill} - {desc}")
# 3. Create runtime with skill support
runtime = AdapterRuntime(agent)
runtime.add_preprocessor(
create_skill_preprocessor(skill_invoker, agent),
)
runtime.add_postprocessor(create_skill_postprocessor())
# 4. Add adapters
slack_adapter = SlackAdapter(AdapterConfig(
platform="slack",
enabled=True,
credentials={
"bot_token": "xoxb-YOUR-TOKEN",
"app_token": "xapp-YOUR-TOKEN",
},
))
runtime.add_adapter(slack_adapter)
telegram_adapter = TelegramAdapter(AdapterConfig(
platform="telegram",
enabled=True,
credentials={"bot_token": "YOUR-TELEGRAM-TOKEN"},
))
runtime.add_adapter(telegram_adapter)
print("\n[Setup] Bot configured with skill support")
print("\nUsers can now invoke skills from Slack/Telegram:")
print(" Example: '/adapter-dev create Discord adapter'")
print(" Example: '/skills' (list available skills)")
# 5. Start runtime
await runtime.start()
print("\n" + "=" * 60)
print("Bot is running! Press Ctrl+C to stop.")
print("=" * 60 + "\n")
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n[Shutdown] Stopping...")
finally:
await runtime.stop()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,85 @@
"""
Example: Pulse & Brain with ONLY YOUR chosen checks.
By default, pulse_brain.py includes example checks.
This shows how to start with a CLEAN SLATE and only add what YOU want.
"""
import time
from pathlib import Path
from agent import Agent
from pulse_brain import BrainTask, CheckType, PulseCheck, PulseBrain
def check_my_file() -> dict:
"""Check if the important data file exists."""
file = Path("important_data.json")
return {
"status": "ok" if file.exists() else "error",
"message": f"File exists: {file.exists()}",
}
def main() -> None:
agent = Agent(provider="claude", enable_heartbeat=False)
# Create Pulse & Brain with NO automatic checks
pb = PulseBrain(agent, pulse_interval=60)
# Remove all default checks (start clean)
pb.pulse_checks = []
pb.brain_tasks = []
print(
"Starting with ZERO checks. "
"You have complete control.\n"
)
# Add ONLY the checks you want
pb.pulse_checks.append(
PulseCheck(
"my-file", check_my_file,
interval_seconds=300,
),
)
pb.brain_tasks.append(
BrainTask(
name="file-recovery",
check_type=CheckType.CONDITIONAL,
prompt_template=(
"The file important_data.json is missing. "
"What should I do to recover it?"
),
condition_func=lambda data: (
data.get("status") == "error"
),
),
)
print("Added 1 pulse check: my-file")
print("Added 1 brain task: file-recovery")
print("\nThe agent will ONLY:")
print(
" 1. Check if important_data.json exists "
"(every 5 min, zero cost)"
)
print(
" 2. Ask for recovery help IF it's missing "
"(costs tokens)"
)
print("\nNothing else. You have complete control.\n")
pb.start()
try:
print("Running... Press Ctrl+C to stop\n")
while True:
time.sleep(1)
except KeyboardInterrupt:
pb.stop()
if __name__ == "__main__":
main()

196
examples/example_usage.py Normal file
View File

@@ -0,0 +1,196 @@
"""Example: Using the Memory System with SOUL and User files."""
from memory_system import MemorySystem
def main() -> None:
print("=" * 60)
print("Memory System - SOUL + User Files Example")
print("=" * 60)
memory = MemorySystem()
# 1. SOUL - Define agent personality
print("\n[1] Updating SOUL (Agent Personality)...")
memory.update_soul(
"""
## Additional Traits
- I remember user preferences and adapt
- I maintain context across conversations
- I learn from corrections and feedback
## Goals
- Help users be more productive
- Provide accurate, helpful information
- Build long-term relationships through memory
""",
append=True,
)
# 2. Create user profiles
print("\n[2] Creating user profiles...")
memory.update_user(
"alice",
"""
## Personal Info
- Name: Alice Johnson
- Role: Senior Python Developer
- Timezone: America/New_York (EST)
- Active hours: 9 AM - 6 PM EST
## Preferences
- Communication: Detailed technical explanations
- Code style: PEP 8, type hints, docstrings
- Favorite tools: VS Code, pytest, black
## Current Projects
- Building a microservices architecture
- Learning Kubernetes
- Migrating legacy Django app
## Recent Conversations
- 2026-02-12: Discussed SQLite full-text search implementation
- 2026-02-12: Asked about memory system design patterns
""",
)
memory.update_user(
"bob",
"""
## Personal Info
- Name: Bob Smith
- Role: Frontend Developer
- Timezone: America/Los_Angeles (PST)
- Active hours: 11 AM - 8 PM PST
## Preferences
- Communication: Concise, bullet points
- Code style: ESLint, Prettier, React best practices
- Favorite tools: WebStorm, Vite, TailwindCSS
## Current Projects
- React dashboard redesign
- Learning TypeScript
- Performance optimization work
## Recent Conversations
- 2026-02-11: Asked about React optimization techniques
- 2026-02-12: Discussed Vite configuration
""",
)
# 3. Add long-term memory
print("\n[3] Adding long-term memory...")
memory.write_memory(
"""
# System Architecture Decisions
## Memory System Design
- **Date**: 2026-02-12
- **Decision**: Use SQLite + Markdown for memory
- **Rationale**: Simple, fast, no external dependencies
- **Files**: SOUL.md for personality, users/*.md for user context
## Search Strategy
- FTS5 for keyword search (fast, built-in)
- No vector embeddings (keep it simple)
- Per-user search capability for privacy
""",
daily=False,
)
# 4. Add daily log
print("\n[4] Adding today's notes...")
memory.write_memory(
"""
## Conversations
### Alice (10:30 AM)
- Discussed memory system implementation
- Showed interest in SQLite FTS5 features
- Plans to integrate into her microservices project
### Bob (2:45 PM)
- Quick question about React performance
- Mentioned working late tonight on dashboard
- Prefers short, actionable answers
""",
daily=True,
)
# 5. Perform searches
print("\n[5] Searching memory...")
print("\n -> Global search for 'python':")
results = memory.search("python", max_results=3)
for r in results:
print(f" {r['path']}:{r['start_line']} - {r['snippet']}")
print("\n -> Alice's memory for 'project':")
alice_results = memory.search_user(
"alice", "project", max_results=2
)
for r in alice_results:
print(f" {r['path']}:{r['start_line']} - {r['snippet']}")
print("\n -> Bob's memory for 'React':")
bob_results = memory.search_user("bob", "React", max_results=2)
for r in bob_results:
print(f" {r['path']}:{r['start_line']} - {r['snippet']}")
# 6. Retrieve specific content
print("\n[6] Retrieving specific content...")
soul = memory.get_soul()
print(f"\n SOUL.md ({len(soul)} chars):")
print(" " + "\n ".join(soul.split("\n")[:5]))
print(" ...")
alice_context = memory.get_user("alice")
print(f"\n Alice's profile ({len(alice_context)} chars):")
print(" " + "\n ".join(alice_context.split("\n")[:5]))
print(" ...")
# 7. Show system status
print("\n[7] System Status:")
status = memory.status()
for key, value in status.items():
print(f" {key}: {value}")
print(f"\n Users: {', '.join(memory.list_users())}")
# 8. Demonstrate contextual response
print("\n" + "=" * 60)
print("CONTEXTUAL RESPONSE EXAMPLE")
print("=" * 60)
def get_context_for_user(username: str) -> str:
"""Build context for an AI response."""
user_soul = memory.get_soul()
user_prefs = memory.get_user(username)
recent_memory = memory.search_user(
username, "recent", max_results=2
)
recent_snippet = (
recent_memory[0]["snippet"]
if recent_memory
else "No recent activity"
)
return (
f"\n=== SOUL ===\n{user_soul[:200]}...\n\n"
f"=== USER: {username} ===\n{user_prefs[:200]}...\n\n"
f"=== RECENT CONTEXT ===\n{recent_snippet}\n"
)
print("\nContext for Alice:")
print(get_context_for_user("alice"))
memory.close()
print("\nMemory system closed")
if __name__ == "__main__":
main()