Files
ajarbot/gitea_tools/client.py
Jordan Ramos fe7c146dc6 feat: Add Gitea MCP integration and project cleanup
## New Features
- **Gitea MCP Tools** (zero API cost):
  - gitea_read_file: Read files from homelab repo
  - gitea_list_files: Browse directories
  - gitea_search_code: Search by filename
  - gitea_get_tree: Get directory tree
- **Gitea Client** (gitea_tools/client.py): REST API wrapper with OAuth
- **Proxmox SSH Scripts** (scripts/): Homelab data collection utilities
- **Obsidian MCP Support** (obsidian_mcp.py): Advanced vault operations
- **Voice Integration Plan** (JARVIS_VOICE_INTEGRATION_PLAN.md)

## Improvements
- **Increased timeout**: 5min → 10min for complex tasks (llm_interface.py)
- **Removed Direct API fallback**: Gitea tools are MCP-only (zero cost)
- **Updated .env.example**: Added Obsidian MCP configuration
- **Enhanced .gitignore**: Protect personal memory files (SOUL.md, MEMORY.md)

## Cleanup
- Deleted 24 obsolete files (temp/test/experimental scripts, outdated docs)
- Untracked personal memory files (SOUL.md, MEMORY.md now in .gitignore)
- Removed: AGENT_SDK_IMPLEMENTATION.md, HYBRID_SEARCH_SUMMARY.md,
  IMPLEMENTATION_SUMMARY.md, MIGRATION.md, test_agent_sdk.py, etc.

## Configuration
- Added config/gitea_config.example.yaml (Gitea setup template)
- Added config/obsidian_mcp.example.yaml (Obsidian MCP template)
- Updated scheduled_tasks.yaml with new task examples

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-18 20:31:32 -07:00

598 lines
20 KiB
Python

"""Gitea API Client - Access private Gitea repositories.
Uses Gitea's REST API (compatible with GitHub API v3) to read files,
list directories, search code, and get directory trees from private repos.
Authentication via Personal Access Token configured in config/gitea_config.yaml.
"""
import base64
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
import httpx
import yaml
logger = logging.getLogger(__name__)
# Config file path
_CONFIG_PATH = Path("config/gitea_config.yaml")
# Request timeout (seconds)
_REQUEST_TIMEOUT = 10.0
# Maximum file size to return (1MB)
_MAX_FILE_SIZE = 1_000_000
# Maximum output characters (prevents token explosion)
_MAX_OUTPUT_CHARS = 5000
class GiteaClient:
"""Client for Gitea REST API with Personal Access Token authentication."""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
default_owner: Optional[str] = None,
default_repo: Optional[str] = None,
) -> None:
"""Initialize Gitea client.
Args:
base_url: Gitea instance URL (e.g., "https://vulcan.apophisnetworking.net").
token: Personal Access Token for authentication.
default_owner: Default repository owner (e.g., "jramos").
default_repo: Default repository name (e.g., "homelab").
If arguments are not provided, reads from config/gitea_config.yaml.
"""
config = self._load_config()
self.base_url = (base_url or config.get("base_url", "")).rstrip("/")
self.token = token or config.get("token", "")
self.default_owner = default_owner or config.get("default_owner", "")
self.default_repo = default_repo or config.get("default_repo", "")
if not self.base_url:
raise ValueError(
"Gitea base_url not configured. "
"Set it in config/gitea_config.yaml or pass base_url argument."
)
if not self.token:
raise ValueError(
"Gitea token not configured. "
"Create a Personal Access Token at "
f"{self.base_url}/user/settings/applications "
"and add it to config/gitea_config.yaml"
)
self.api_url = f"{self.base_url}/api/v1"
logger.info(
"[Gitea] Client initialized: %s (default: %s/%s)",
self.base_url,
self.default_owner,
self.default_repo,
)
@staticmethod
def _load_config() -> Dict[str, Any]:
"""Load configuration from YAML file."""
if not _CONFIG_PATH.exists():
logger.warning(
"[Gitea] Config file not found: %s. "
"Copy config/gitea_config.example.yaml to config/gitea_config.yaml",
_CONFIG_PATH,
)
return {}
try:
content = _CONFIG_PATH.read_text(encoding="utf-8")
config = yaml.safe_load(content) or {}
return config
except Exception as e:
logger.error("[Gitea] Failed to load config: %s", e)
return {}
def _parse_repo(
self,
repo: Optional[str] = None,
owner: Optional[str] = None,
) -> tuple:
"""Parse owner/repo from various input formats.
Args:
repo: Repository in "owner/repo" format, or just "repo" name.
owner: Explicit owner (overrides repo string parsing).
Returns:
Tuple of (owner, repo) strings.
"""
if repo and "/" in repo:
parts = repo.split("/", 1)
parsed_owner = parts[0]
parsed_repo = parts[1]
else:
parsed_owner = owner or self.default_owner
parsed_repo = repo or self.default_repo
if owner:
parsed_owner = owner
if not parsed_owner or not parsed_repo:
raise ValueError(
f"Repository not specified. Provide repo as 'owner/repo' "
f"or configure default_owner/default_repo in gitea_config.yaml. "
f"Got owner='{parsed_owner}', repo='{parsed_repo}'"
)
return parsed_owner, parsed_repo
def _headers(self) -> Dict[str, str]:
"""Build request headers with authentication."""
return {
"Authorization": f"token {self.token}",
"Accept": "application/json",
"User-Agent": "Garvis/1.0 (Ajarbot Gitea Integration)",
}
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Make an authenticated API request.
Args:
method: HTTP method (GET, POST, etc.).
endpoint: API endpoint path (e.g., "/repos/jramos/homelab/contents/README.md").
params: Optional query parameters.
Returns:
Dict with "success" key and either "data" or "error".
"""
url = f"{self.api_url}{endpoint}"
try:
async with httpx.AsyncClient(
timeout=_REQUEST_TIMEOUT,
follow_redirects=True,
verify=True,
headers=self._headers(),
) as client:
response = await client.request(method, url, params=params)
if response.status_code == 401:
return {
"success": False,
"error": (
"Authentication failed (HTTP 401). "
"Check your Personal Access Token in config/gitea_config.yaml. "
f"Generate a new token at: {self.base_url}/user/settings/applications"
),
}
elif response.status_code == 404:
return {
"success": False,
"error": f"Not found (HTTP 404): {endpoint}",
}
elif response.status_code >= 400:
return {
"success": False,
"error": f"HTTP {response.status_code}: {response.text[:200]}",
}
data = response.json()
return {"success": True, "data": data}
except httpx.TimeoutException:
return {
"success": False,
"error": f"Request to {self.base_url} timed out after {_REQUEST_TIMEOUT}s",
}
except httpx.ConnectError as e:
return {
"success": False,
"error": f"Connection failed to {self.base_url}: {e}",
}
except Exception as e:
return {
"success": False,
"error": f"Request failed: {str(e)}",
}
async def get_file_content(
self,
file_path: str,
owner: Optional[str] = None,
repo: Optional[str] = None,
branch: Optional[str] = None,
) -> Dict[str, Any]:
"""Get raw file content from a repository.
Uses Gitea Contents API: GET /repos/{owner}/{repo}/contents/{filepath}
Args:
file_path: Path to file in repo (e.g., "scripts/proxmox_collector.py").
owner: Repository owner (default: from config).
repo: Repository name or "owner/repo" (default: from config).
branch: Branch name (default: repo default branch).
Returns:
Dict with "success", and either "content"/"metadata" or "error".
"""
parsed_owner, parsed_repo = self._parse_repo(repo, owner)
# Normalize file path (remove leading slash)
file_path = file_path.lstrip("/")
endpoint = f"/repos/{parsed_owner}/{parsed_repo}/contents/{file_path}"
params = {}
if branch:
params["ref"] = branch
result = await self._request("GET", endpoint, params=params)
if not result["success"]:
return result
data = result["data"]
# Handle case where path is a directory (returns a list)
if isinstance(data, list):
return {
"success": False,
"error": (
f"'{file_path}' is a directory, not a file. "
f"Use gitea_list_files to browse directories."
),
}
# Check file size
file_size = data.get("size", 0)
if file_size > _MAX_FILE_SIZE:
return {
"success": True,
"content": (
f"[File too large: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB). "
f"Maximum is {_MAX_FILE_SIZE:,} bytes. "
f"Use the download URL to fetch it directly.]"
),
"metadata": {
"name": data.get("name", ""),
"path": data.get("path", ""),
"size": file_size,
"download_url": data.get("download_url", ""),
"sha": data.get("sha", ""),
},
}
# Decode base64 content
encoded_content = data.get("content", "")
try:
content = base64.b64decode(encoded_content).decode("utf-8")
except (UnicodeDecodeError, Exception):
return {
"success": True,
"content": "[Binary file - cannot display as text]",
"metadata": {
"name": data.get("name", ""),
"path": data.get("path", ""),
"size": file_size,
"encoding": data.get("encoding", ""),
"download_url": data.get("download_url", ""),
},
}
# Truncate if too long
truncated = False
if len(content) > _MAX_OUTPUT_CHARS:
content = content[:_MAX_OUTPUT_CHARS] + "\n\n... (file truncated)"
truncated = True
return {
"success": True,
"content": content,
"metadata": {
"name": data.get("name", ""),
"path": data.get("path", ""),
"size": file_size,
"sha": data.get("sha", ""),
"last_commit_sha": data.get("last_commit_sha", ""),
"download_url": data.get("download_url", ""),
"truncated": truncated,
},
}
async def list_files(
self,
path: str = "",
owner: Optional[str] = None,
repo: Optional[str] = None,
branch: Optional[str] = None,
) -> Dict[str, Any]:
"""List files and directories at a path in the repository.
Uses Gitea Contents API: GET /repos/{owner}/{repo}/contents/{path}
Args:
path: Directory path in repo (e.g., "scripts/"). Empty for root.
owner: Repository owner.
repo: Repository name or "owner/repo".
branch: Branch name.
Returns:
Dict with "success" and either "files" list or "error".
"""
parsed_owner, parsed_repo = self._parse_repo(repo, owner)
# Normalize path
path = path.strip("/")
endpoint = f"/repos/{parsed_owner}/{parsed_repo}/contents/{path}" if path else f"/repos/{parsed_owner}/{parsed_repo}/contents"
params = {}
if branch:
params["ref"] = branch
result = await self._request("GET", endpoint, params=params)
if not result["success"]:
return result
data = result["data"]
# If it's a single file (not a directory), inform the user
if isinstance(data, dict):
return {
"success": False,
"error": (
f"'{path}' is a file, not a directory. "
f"Use gitea_read_file to read file contents."
),
}
# Build file listing
files = []
for item in data:
entry = {
"name": item.get("name", ""),
"type": item.get("type", ""), # "file" or "dir"
"path": item.get("path", ""),
"size": item.get("size", 0),
}
files.append(entry)
# Sort: directories first, then files, alphabetically
files.sort(key=lambda f: (0 if f["type"] == "dir" else 1, f["name"].lower()))
return {
"success": True,
"files": files,
"path": path or "/",
"repo": f"{parsed_owner}/{parsed_repo}",
"count": len(files),
}
async def search_code(
self,
query: str,
owner: Optional[str] = None,
repo: Optional[str] = None,
) -> Dict[str, Any]:
"""Search for code in a repository.
Uses Gitea Code Search API: GET /repos/{owner}/{repo}/topics (fallback)
or the general search: GET /repos/search
Note: Gitea's code search depends on indexer configuration.
Falls back to repo-level search if code search is unavailable.
Args:
query: Search query string.
owner: Repository owner.
repo: Repository name or "owner/repo".
Returns:
Dict with "success" and either "results" or "error".
"""
parsed_owner, parsed_repo = self._parse_repo(repo, owner)
# Try Gitea's code search endpoint first
# GET /repos/{owner}/{repo}/contents - search by traversing
# Gitea doesn't have a direct per-repo code search API like GitHub
# Use the global code search with repo filter
endpoint = "/repos/search"
params = {
"q": query,
"owner": parsed_owner,
"limit": 10,
}
# First try: global code search (if Gitea has it enabled)
code_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/git/grep"
code_params = {"query": query}
# Gitea doesn't have a git grep API, use the topic/label search
# or fall back to listing + content search
# Best approach: use the Gitea search API
search_endpoint = "/repos/search"
search_params = {
"q": query,
"limit": 10,
}
# For code search, Gitea's best option is the global search endpoint
# with topic filter. But for actual file content search, we need to
# traverse the tree and search file contents.
# Use a pragmatic approach: get the repo tree and search filenames
# and provide useful results.
# Strategy: Get flat tree, filter by query in filename and path
tree_result = await self.get_tree(
owner=parsed_owner,
repo=parsed_repo,
recursive=True,
)
if not tree_result["success"]:
return tree_result
entries = tree_result.get("entries", [])
query_lower = query.lower()
# Search filenames and paths
matches = []
for entry in entries:
path = entry.get("path", "")
if query_lower in path.lower():
matches.append({
"path": path,
"type": entry.get("type", ""),
"size": entry.get("size", 0),
"match_type": "filename",
})
# Limit results
matches = matches[:20]
if not matches:
return {
"success": True,
"results": [],
"query": query,
"message": (
f"No files matching '{query}' found in "
f"{parsed_owner}/{parsed_repo}. "
f"Note: This searches file/directory names only. "
f"For content search, read specific files with gitea_read_file."
),
}
return {
"success": True,
"results": matches,
"query": query,
"repo": f"{parsed_owner}/{parsed_repo}",
"count": len(matches),
}
async def get_tree(
self,
owner: Optional[str] = None,
repo: Optional[str] = None,
branch: Optional[str] = None,
recursive: bool = False,
) -> Dict[str, Any]:
"""Get the directory tree of a repository.
Uses Gitea Git Trees API: GET /repos/{owner}/{repo}/git/trees/{sha}
Args:
owner: Repository owner.
repo: Repository name or "owner/repo".
branch: Branch name (default: repo default branch).
recursive: If True, get full recursive tree.
Returns:
Dict with "success" and either "entries" list or "error".
"""
parsed_owner, parsed_repo = self._parse_repo(repo, owner)
# First, get the branch SHA (or default branch)
ref = branch or "main"
branch_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/branches/{ref}"
branch_result = await self._request("GET", branch_endpoint)
if not branch_result["success"]:
# Try "master" as fallback
if not branch and "404" in branch_result.get("error", ""):
ref = "master"
branch_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/branches/{ref}"
branch_result = await self._request("GET", branch_endpoint)
if not branch_result["success"]:
return branch_result
branch_data = branch_result["data"]
tree_sha = branch_data.get("commit", {}).get("id", "")
if not tree_sha:
return {
"success": False,
"error": f"Could not get tree SHA for branch '{ref}'",
}
# Get the tree
tree_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/git/trees/{tree_sha}"
params = {}
if recursive:
params["recursive"] = "true"
tree_result = await self._request("GET", tree_endpoint, params=params)
if not tree_result["success"]:
return tree_result
tree_data = tree_result["data"]
raw_entries = tree_data.get("tree", [])
# Format entries
entries = []
for entry in raw_entries:
entry_type = entry.get("type", "")
# Map git object types to readable types
if entry_type == "blob":
readable_type = "file"
elif entry_type == "tree":
readable_type = "dir"
else:
readable_type = entry_type
entries.append({
"path": entry.get("path", ""),
"type": readable_type,
"size": entry.get("size", 0),
"sha": entry.get("sha", ""),
})
# Sort: directories first, then files
entries.sort(key=lambda e: (0 if e["type"] == "dir" else 1, e["path"].lower()))
return {
"success": True,
"entries": entries,
"branch": ref,
"repo": f"{parsed_owner}/{parsed_repo}",
"total": len(entries),
"truncated": tree_data.get("truncated", False),
}
# Singleton client instance (lazy-loaded)
_gitea_client: Optional[GiteaClient] = None
def get_gitea_client() -> Optional[GiteaClient]:
"""Get or create the singleton Gitea client.
Returns None if configuration is missing or invalid.
"""
global _gitea_client
if _gitea_client is not None:
return _gitea_client
try:
_gitea_client = GiteaClient()
return _gitea_client
except ValueError as e:
logger.warning("[Gitea] Client not available: %s", e)
return None
except Exception as e:
logger.error("[Gitea] Failed to initialize client: %s", e)
return None