"""Loki HTTP API Client. Handles all communication with Loki's query endpoints: - /loki/api/v1/query_range (log queries over a time window) - /loki/api/v1/labels (list all label names) - /loki/api/v1/label/{name}/values (values for a specific label) - /loki/api/v1/series (active label sets / streams) """ import logging from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional import httpx from mcp_servers.loki.config import ( LOKI_URL, LOKI_TIMEOUT, DEFAULT_LIMIT, DEFAULT_RANGE_HOURS, ) logger = logging.getLogger(__name__) class LokiClient: """Async HTTP client for Loki's query API.""" def __init__( self, url: Optional[str] = None, timeout: Optional[int] = None, ): self.url = url or LOKI_URL self.timeout = timeout or LOKI_TIMEOUT self._client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """Lazy-init the async HTTP client.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( base_url=self.url, timeout=self.timeout, ) return self._client async def close(self): """Close the HTTP client.""" if self._client and not self._client.is_closed: await self._client.aclose() # ------------------------------------------------------------------ # Time helpers # ------------------------------------------------------------------ @staticmethod def _to_nano_ts(dt: datetime) -> str: """Convert a datetime to Loki's nanosecond-epoch string.""" return str(int(dt.timestamp() * 1_000_000_000)) @staticmethod def _default_range(hours: Optional[int] = None): """Return (start, end) as nano-epoch strings for the last N hours.""" now = datetime.now(timezone.utc) hrs = hours or DEFAULT_RANGE_HOURS start = now - timedelta(hours=hrs) return ( LokiClient._to_nano_ts(start), LokiClient._to_nano_ts(now), ) # ------------------------------------------------------------------ # Core queries # ------------------------------------------------------------------ async def query_range( self, query: str, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None, hours: Optional[int] = None, ) -> Dict[str, Any]: """Run a LogQL query over a time range. Args: query: LogQL expression, e.g. '{job="varlogs"} |= "error"' start: Nano-epoch start (optional — defaults to now-). end: Nano-epoch end (optional — defaults to now). limit: Max log lines to return. hours: Shorthand for "last N hours" (ignored if start/end given). Returns: Raw JSON response from Loki (dict). """ if not start or not end: start, end = self._default_range(hours) params = { "query": query, "start": start, "end": end, "limit": limit or DEFAULT_LIMIT, } client = await self._get_client() logger.debug("[Loki] query_range: %s", params) resp = await client.get("/loki/api/v1/query_range", params=params) resp.raise_for_status() return resp.json() async def labels(self) -> List[str]: """List all known label names.""" client = await self._get_client() resp = await client.get("/loki/api/v1/labels") resp.raise_for_status() data = resp.json() return data.get("data", []) async def label_values(self, label: str) -> List[str]: """List values for a specific label (e.g. 'job', 'host').""" client = await self._get_client() resp = await client.get(f"/loki/api/v1/label/{label}/values") resp.raise_for_status() data = resp.json() return data.get("data", []) async def series( self, match: Optional[List[str]] = None, hours: Optional[int] = None, ) -> List[Dict[str, str]]: """List active streams/series matching optional selectors. Args: match: List of LogQL stream selectors, e.g. ['{job="varlogs"}']. hours: Time window to search (default: DEFAULT_RANGE_HOURS). """ start, end = self._default_range(hours) params: Dict[str, Any] = {"start": start, "end": end} if match: params["match[]"] = match client = await self._get_client() resp = await client.get("/loki/api/v1/series", params=params) resp.raise_for_status() data = resp.json() return data.get("data", []) async def health(self) -> bool: """Quick health check — hits /ready endpoint.""" try: client = await self._get_client() resp = await client.get("/ready") return resp.status_code == 200 except Exception as e: logger.warning("[Loki] Health check failed: %s", e) return False # ------------------------------------------------------------------ # Convenience: extract just the log lines from a query_range result # ------------------------------------------------------------------ @staticmethod def extract_lines(result: Dict[str, Any]) -> List[Dict[str, str]]: """Pull log lines from a query_range response. Returns list of {"timestamp": ..., "line": ..., "labels": ...} dicts, sorted newest-first. """ lines = [] data = result.get("data", {}) for stream in data.get("result", []): labels = stream.get("stream", {}) label_str = ", ".join(f'{k}="{v}"' for k, v in labels.items()) for ts, line in stream.get("values", []): # Convert nano-epoch to human-readable dt = datetime.fromtimestamp( int(ts) / 1_000_000_000, tz=timezone.utc ) lines.append({ "timestamp": dt.strftime("%Y-%m-%d %H:%M:%S UTC"), "line": line, "labels": label_str, }) # Newest first lines.sort(key=lambda x: x["timestamp"], reverse=True) return lines