"""Gitea API Client - Access private Gitea repositories. Uses Gitea's REST API (compatible with GitHub API v3) to read files, list directories, search code, and get directory trees from private repos. Authentication via Personal Access Token configured in config/gitea_config.yaml. """ import base64 import logging from pathlib import Path from typing import Any, Dict, List, Optional import httpx import yaml logger = logging.getLogger(__name__) # Config file path _CONFIG_PATH = Path("config/gitea_config.yaml") # Request timeout (seconds) _REQUEST_TIMEOUT = 10.0 # Maximum file size to return (1MB) _MAX_FILE_SIZE = 1_000_000 # Maximum output characters (prevents token explosion) _MAX_OUTPUT_CHARS = 5000 class GiteaClient: """Client for Gitea REST API with Personal Access Token authentication.""" def __init__( self, base_url: Optional[str] = None, token: Optional[str] = None, default_owner: Optional[str] = None, default_repo: Optional[str] = None, ) -> None: """Initialize Gitea client. Args: base_url: Gitea instance URL (e.g., "https://vulcan.apophisnetworking.net"). token: Personal Access Token for authentication. default_owner: Default repository owner (e.g., "jramos"). default_repo: Default repository name (e.g., "homelab"). If arguments are not provided, reads from config/gitea_config.yaml. """ config = self._load_config() self.base_url = (base_url or config.get("base_url", "")).rstrip("/") self.token = token or config.get("token", "") self.default_owner = default_owner or config.get("default_owner", "") self.default_repo = default_repo or config.get("default_repo", "") if not self.base_url: raise ValueError( "Gitea base_url not configured. " "Set it in config/gitea_config.yaml or pass base_url argument." ) if not self.token: raise ValueError( "Gitea token not configured. " "Create a Personal Access Token at " f"{self.base_url}/user/settings/applications " "and add it to config/gitea_config.yaml" ) self.api_url = f"{self.base_url}/api/v1" logger.info( "[Gitea] Client initialized: %s (default: %s/%s)", self.base_url, self.default_owner, self.default_repo, ) @staticmethod def _load_config() -> Dict[str, Any]: """Load configuration from YAML file.""" if not _CONFIG_PATH.exists(): logger.warning( "[Gitea] Config file not found: %s. " "Copy config/gitea_config.example.yaml to config/gitea_config.yaml", _CONFIG_PATH, ) return {} try: content = _CONFIG_PATH.read_text(encoding="utf-8") config = yaml.safe_load(content) or {} return config except Exception as e: logger.error("[Gitea] Failed to load config: %s", e) return {} def _parse_repo( self, repo: Optional[str] = None, owner: Optional[str] = None, ) -> tuple: """Parse owner/repo from various input formats. Args: repo: Repository in "owner/repo" format, or just "repo" name. owner: Explicit owner (overrides repo string parsing). Returns: Tuple of (owner, repo) strings. """ if repo and "/" in repo: parts = repo.split("/", 1) parsed_owner = parts[0] parsed_repo = parts[1] else: parsed_owner = owner or self.default_owner parsed_repo = repo or self.default_repo if owner: parsed_owner = owner if not parsed_owner or not parsed_repo: raise ValueError( f"Repository not specified. Provide repo as 'owner/repo' " f"or configure default_owner/default_repo in gitea_config.yaml. " f"Got owner='{parsed_owner}', repo='{parsed_repo}'" ) return parsed_owner, parsed_repo def _headers(self) -> Dict[str, str]: """Build request headers with authentication.""" return { "Authorization": f"token {self.token}", "Accept": "application/json", "User-Agent": "Garvis/1.0 (Ajarbot Gitea Integration)", } async def _request( self, method: str, endpoint: str, params: Optional[Dict] = None, ) -> Dict[str, Any]: """Make an authenticated API request. Args: method: HTTP method (GET, POST, etc.). endpoint: API endpoint path (e.g., "/repos/jramos/homelab/contents/README.md"). params: Optional query parameters. Returns: Dict with "success" key and either "data" or "error". """ url = f"{self.api_url}{endpoint}" try: async with httpx.AsyncClient( timeout=_REQUEST_TIMEOUT, follow_redirects=True, verify=True, headers=self._headers(), ) as client: response = await client.request(method, url, params=params) if response.status_code == 401: return { "success": False, "error": ( "Authentication failed (HTTP 401). " "Check your Personal Access Token in config/gitea_config.yaml. " f"Generate a new token at: {self.base_url}/user/settings/applications" ), } elif response.status_code == 404: return { "success": False, "error": f"Not found (HTTP 404): {endpoint}", } elif response.status_code >= 400: return { "success": False, "error": f"HTTP {response.status_code}: {response.text[:200]}", } data = response.json() return {"success": True, "data": data} except httpx.TimeoutException: return { "success": False, "error": f"Request to {self.base_url} timed out after {_REQUEST_TIMEOUT}s", } except httpx.ConnectError as e: return { "success": False, "error": f"Connection failed to {self.base_url}: {e}", } except Exception as e: return { "success": False, "error": f"Request failed: {str(e)}", } async def get_file_content( self, file_path: str, owner: Optional[str] = None, repo: Optional[str] = None, branch: Optional[str] = None, ) -> Dict[str, Any]: """Get raw file content from a repository. Uses Gitea Contents API: GET /repos/{owner}/{repo}/contents/{filepath} Args: file_path: Path to file in repo (e.g., "scripts/proxmox_collector.py"). owner: Repository owner (default: from config). repo: Repository name or "owner/repo" (default: from config). branch: Branch name (default: repo default branch). Returns: Dict with "success", and either "content"/"metadata" or "error". """ parsed_owner, parsed_repo = self._parse_repo(repo, owner) # Normalize file path (remove leading slash) file_path = file_path.lstrip("/") endpoint = f"/repos/{parsed_owner}/{parsed_repo}/contents/{file_path}" params = {} if branch: params["ref"] = branch result = await self._request("GET", endpoint, params=params) if not result["success"]: return result data = result["data"] # Handle case where path is a directory (returns a list) if isinstance(data, list): return { "success": False, "error": ( f"'{file_path}' is a directory, not a file. " f"Use gitea_list_files to browse directories." ), } # Check file size file_size = data.get("size", 0) if file_size > _MAX_FILE_SIZE: return { "success": True, "content": ( f"[File too large: {file_size:,} bytes ({file_size / 1024 / 1024:.1f} MB). " f"Maximum is {_MAX_FILE_SIZE:,} bytes. " f"Use the download URL to fetch it directly.]" ), "metadata": { "name": data.get("name", ""), "path": data.get("path", ""), "size": file_size, "download_url": data.get("download_url", ""), "sha": data.get("sha", ""), }, } # Decode base64 content encoded_content = data.get("content", "") try: content = base64.b64decode(encoded_content).decode("utf-8") except (UnicodeDecodeError, Exception): return { "success": True, "content": "[Binary file - cannot display as text]", "metadata": { "name": data.get("name", ""), "path": data.get("path", ""), "size": file_size, "encoding": data.get("encoding", ""), "download_url": data.get("download_url", ""), }, } # Truncate if too long truncated = False if len(content) > _MAX_OUTPUT_CHARS: content = content[:_MAX_OUTPUT_CHARS] + "\n\n... (file truncated)" truncated = True return { "success": True, "content": content, "metadata": { "name": data.get("name", ""), "path": data.get("path", ""), "size": file_size, "sha": data.get("sha", ""), "last_commit_sha": data.get("last_commit_sha", ""), "download_url": data.get("download_url", ""), "truncated": truncated, }, } async def list_files( self, path: str = "", owner: Optional[str] = None, repo: Optional[str] = None, branch: Optional[str] = None, ) -> Dict[str, Any]: """List files and directories at a path in the repository. Uses Gitea Contents API: GET /repos/{owner}/{repo}/contents/{path} Args: path: Directory path in repo (e.g., "scripts/"). Empty for root. owner: Repository owner. repo: Repository name or "owner/repo". branch: Branch name. Returns: Dict with "success" and either "files" list or "error". """ parsed_owner, parsed_repo = self._parse_repo(repo, owner) # Normalize path path = path.strip("/") endpoint = f"/repos/{parsed_owner}/{parsed_repo}/contents/{path}" if path else f"/repos/{parsed_owner}/{parsed_repo}/contents" params = {} if branch: params["ref"] = branch result = await self._request("GET", endpoint, params=params) if not result["success"]: return result data = result["data"] # If it's a single file (not a directory), inform the user if isinstance(data, dict): return { "success": False, "error": ( f"'{path}' is a file, not a directory. " f"Use gitea_read_file to read file contents." ), } # Build file listing files = [] for item in data: entry = { "name": item.get("name", ""), "type": item.get("type", ""), # "file" or "dir" "path": item.get("path", ""), "size": item.get("size", 0), } files.append(entry) # Sort: directories first, then files, alphabetically files.sort(key=lambda f: (0 if f["type"] == "dir" else 1, f["name"].lower())) return { "success": True, "files": files, "path": path or "/", "repo": f"{parsed_owner}/{parsed_repo}", "count": len(files), } async def search_code( self, query: str, owner: Optional[str] = None, repo: Optional[str] = None, ) -> Dict[str, Any]: """Search for code in a repository. Uses Gitea Code Search API: GET /repos/{owner}/{repo}/topics (fallback) or the general search: GET /repos/search Note: Gitea's code search depends on indexer configuration. Falls back to repo-level search if code search is unavailable. Args: query: Search query string. owner: Repository owner. repo: Repository name or "owner/repo". Returns: Dict with "success" and either "results" or "error". """ parsed_owner, parsed_repo = self._parse_repo(repo, owner) # Try Gitea's code search endpoint first # GET /repos/{owner}/{repo}/contents - search by traversing # Gitea doesn't have a direct per-repo code search API like GitHub # Use the global code search with repo filter endpoint = "/repos/search" params = { "q": query, "owner": parsed_owner, "limit": 10, } # First try: global code search (if Gitea has it enabled) code_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/git/grep" code_params = {"query": query} # Gitea doesn't have a git grep API, use the topic/label search # or fall back to listing + content search # Best approach: use the Gitea search API search_endpoint = "/repos/search" search_params = { "q": query, "limit": 10, } # For code search, Gitea's best option is the global search endpoint # with topic filter. But for actual file content search, we need to # traverse the tree and search file contents. # Use a pragmatic approach: get the repo tree and search filenames # and provide useful results. # Strategy: Get flat tree, filter by query in filename and path tree_result = await self.get_tree( owner=parsed_owner, repo=parsed_repo, recursive=True, ) if not tree_result["success"]: return tree_result entries = tree_result.get("entries", []) query_lower = query.lower() # Search filenames and paths matches = [] for entry in entries: path = entry.get("path", "") if query_lower in path.lower(): matches.append({ "path": path, "type": entry.get("type", ""), "size": entry.get("size", 0), "match_type": "filename", }) # Limit results matches = matches[:20] if not matches: return { "success": True, "results": [], "query": query, "message": ( f"No files matching '{query}' found in " f"{parsed_owner}/{parsed_repo}. " f"Note: This searches file/directory names only. " f"For content search, read specific files with gitea_read_file." ), } return { "success": True, "results": matches, "query": query, "repo": f"{parsed_owner}/{parsed_repo}", "count": len(matches), } async def get_tree( self, owner: Optional[str] = None, repo: Optional[str] = None, branch: Optional[str] = None, recursive: bool = False, ) -> Dict[str, Any]: """Get the directory tree of a repository. Uses Gitea Git Trees API: GET /repos/{owner}/{repo}/git/trees/{sha} Args: owner: Repository owner. repo: Repository name or "owner/repo". branch: Branch name (default: repo default branch). recursive: If True, get full recursive tree. Returns: Dict with "success" and either "entries" list or "error". """ parsed_owner, parsed_repo = self._parse_repo(repo, owner) # First, get the branch SHA (or default branch) ref = branch or "main" branch_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/branches/{ref}" branch_result = await self._request("GET", branch_endpoint) if not branch_result["success"]: # Try "master" as fallback if not branch and "404" in branch_result.get("error", ""): ref = "master" branch_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/branches/{ref}" branch_result = await self._request("GET", branch_endpoint) if not branch_result["success"]: return branch_result branch_data = branch_result["data"] tree_sha = branch_data.get("commit", {}).get("id", "") if not tree_sha: return { "success": False, "error": f"Could not get tree SHA for branch '{ref}'", } # Get the tree tree_endpoint = f"/repos/{parsed_owner}/{parsed_repo}/git/trees/{tree_sha}" params = {} if recursive: params["recursive"] = "true" tree_result = await self._request("GET", tree_endpoint, params=params) if not tree_result["success"]: return tree_result tree_data = tree_result["data"] raw_entries = tree_data.get("tree", []) # Format entries entries = [] for entry in raw_entries: entry_type = entry.get("type", "") # Map git object types to readable types if entry_type == "blob": readable_type = "file" elif entry_type == "tree": readable_type = "dir" else: readable_type = entry_type entries.append({ "path": entry.get("path", ""), "type": readable_type, "size": entry.get("size", 0), "sha": entry.get("sha", ""), }) # Sort: directories first, then files entries.sort(key=lambda e: (0 if e["type"] == "dir" else 1, e["path"].lower())) return { "success": True, "entries": entries, "branch": ref, "repo": f"{parsed_owner}/{parsed_repo}", "total": len(entries), "truncated": tree_data.get("truncated", False), } # Singleton client instance (lazy-loaded) _gitea_client: Optional[GiteaClient] = None def get_gitea_client() -> Optional[GiteaClient]: """Get or create the singleton Gitea client. Returns None if configuration is missing or invalid. """ global _gitea_client if _gitea_client is not None: return _gitea_client try: _gitea_client = GiteaClient() return _gitea_client except ValueError as e: logger.warning("[Gitea] Client not available: %s", e) return None except Exception as e: logger.error("[Gitea] Failed to initialize client: %s", e) return None