Files
ajarbot/mcp_servers/loki/loki_client.py

190 lines
6.3 KiB
Python
Raw Normal View History

"""Loki HTTP API Client.
Handles all communication with Loki's query endpoints:
- /loki/api/v1/query_range (log queries over a time window)
- /loki/api/v1/labels (list all label names)
- /loki/api/v1/label/{name}/values (values for a specific label)
- /loki/api/v1/series (active label sets / streams)
"""
import logging
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
import httpx
from mcp_servers.loki.config import (
LOKI_URL,
LOKI_TIMEOUT,
DEFAULT_LIMIT,
DEFAULT_RANGE_HOURS,
)
logger = logging.getLogger(__name__)
class LokiClient:
"""Async HTTP client for Loki's query API."""
def __init__(
self,
url: Optional[str] = None,
timeout: Optional[int] = None,
):
self.url = url or LOKI_URL
self.timeout = timeout or LOKI_TIMEOUT
self._client: Optional[httpx.AsyncClient] = None
async def _get_client(self) -> httpx.AsyncClient:
"""Lazy-init the async HTTP client."""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
base_url=self.url,
timeout=self.timeout,
)
return self._client
async def close(self):
"""Close the HTTP client."""
if self._client and not self._client.is_closed:
await self._client.aclose()
# ------------------------------------------------------------------
# Time helpers
# ------------------------------------------------------------------
@staticmethod
def _to_nano_ts(dt: datetime) -> str:
"""Convert a datetime to Loki's nanosecond-epoch string."""
return str(int(dt.timestamp() * 1_000_000_000))
@staticmethod
def _default_range(hours: Optional[int] = None):
"""Return (start, end) as nano-epoch strings for the last N hours."""
now = datetime.now(timezone.utc)
hrs = hours or DEFAULT_RANGE_HOURS
start = now - timedelta(hours=hrs)
return (
LokiClient._to_nano_ts(start),
LokiClient._to_nano_ts(now),
)
# ------------------------------------------------------------------
# Core queries
# ------------------------------------------------------------------
async def query_range(
self,
query: str,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None,
hours: Optional[int] = None,
) -> Dict[str, Any]:
"""Run a LogQL query over a time range.
Args:
query: LogQL expression, e.g. '{job="varlogs"} |= "error"'
start: Nano-epoch start (optional defaults to now-<hours>).
end: Nano-epoch end (optional defaults to now).
limit: Max log lines to return.
hours: Shorthand for "last N hours" (ignored if start/end given).
Returns:
Raw JSON response from Loki (dict).
"""
if not start or not end:
start, end = self._default_range(hours)
params = {
"query": query,
"start": start,
"end": end,
"limit": limit or DEFAULT_LIMIT,
}
client = await self._get_client()
logger.debug("[Loki] query_range: %s", params)
resp = await client.get("/loki/api/v1/query_range", params=params)
resp.raise_for_status()
return resp.json()
async def labels(self) -> List[str]:
"""List all known label names."""
client = await self._get_client()
resp = await client.get("/loki/api/v1/labels")
resp.raise_for_status()
data = resp.json()
return data.get("data", [])
async def label_values(self, label: str) -> List[str]:
"""List values for a specific label (e.g. 'job', 'host')."""
client = await self._get_client()
resp = await client.get(f"/loki/api/v1/label/{label}/values")
resp.raise_for_status()
data = resp.json()
return data.get("data", [])
async def series(
self,
match: Optional[List[str]] = None,
hours: Optional[int] = None,
) -> List[Dict[str, str]]:
"""List active streams/series matching optional selectors.
Args:
match: List of LogQL stream selectors, e.g. ['{job="varlogs"}'].
hours: Time window to search (default: DEFAULT_RANGE_HOURS).
"""
start, end = self._default_range(hours)
params: Dict[str, Any] = {"start": start, "end": end}
if match:
params["match[]"] = match
client = await self._get_client()
resp = await client.get("/loki/api/v1/series", params=params)
resp.raise_for_status()
data = resp.json()
return data.get("data", [])
async def health(self) -> bool:
"""Quick health check — hits /ready endpoint."""
try:
client = await self._get_client()
resp = await client.get("/ready")
return resp.status_code == 200
except Exception as e:
logger.warning("[Loki] Health check failed: %s", e)
return False
# ------------------------------------------------------------------
# Convenience: extract just the log lines from a query_range result
# ------------------------------------------------------------------
@staticmethod
def extract_lines(result: Dict[str, Any]) -> List[Dict[str, str]]:
"""Pull log lines from a query_range response.
Returns list of {"timestamp": ..., "line": ..., "labels": ...} dicts,
sorted newest-first.
"""
lines = []
data = result.get("data", {})
for stream in data.get("result", []):
labels = stream.get("stream", {})
label_str = ", ".join(f'{k}="{v}"' for k, v in labels.items())
for ts, line in stream.get("values", []):
# Convert nano-epoch to human-readable
dt = datetime.fromtimestamp(
int(ts) / 1_000_000_000, tz=timezone.utc
)
lines.append({
"timestamp": dt.strftime("%Y-%m-%d %H:%M:%S UTC"),
"line": line,
"labels": label_str,
})
# Newest first
lines.sort(key=lambda x: x["timestamp"], reverse=True)
return lines