Files

274 lines
9.4 KiB
Python
Raw Permalink Normal View History

"""Loki MCP Server — Exposes homelab log querying via MCP tools.
Runs as a stdio-based MCP server. The Agent SDK spawns this as a
subprocess and communicates via JSON-RPC over stdin/stdout.
Tools:
loki_query - Run a LogQL query and get log lines
loki_labels - List all known label names
loki_label_values - Get values for a specific label
loki_series - List active log streams
loki_health - Check if Loki is reachable
Usage (standalone test):
python loki_server.py
"""
import asyncio
import logging
import sys
import os
# Add parent paths so imports work when run as subprocess
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
from mcp_servers.loki.loki_client import LokiClient
logging.basicConfig(level=logging.INFO, stream=sys.stderr)
logger = logging.getLogger(__name__)
# Create the MCP server
server = Server("loki")
# Create the Loki client (uses env vars from config)
client = LokiClient()
# ------------------------------------------------------------------
# Tool definitions
# ------------------------------------------------------------------
@server.list_tools()
async def list_tools() -> list[Tool]:
"""Return the list of tools this server exposes."""
return [
Tool(
name="loki_query",
description=(
"Query logs from Loki using LogQL. "
"Examples: '{job=\"varlogs\"} |= \"error\"', "
"'{container=\"caddy\"} | json | status >= 500'. "
"Returns log lines with timestamps and labels."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"LogQL query expression. Must include a stream selector "
"in curly braces. Examples:\n"
" {job=\"varlogs\"}\n"
" {container=\"caddy\"} |= \"error\"\n"
" {host=\"proxmox\"} | json | level=\"error\""
),
},
"hours": {
"type": "integer",
"description": "How many hours back to search (default: 1)",
"default": 1,
},
"limit": {
"type": "integer",
"description": "Max number of log lines to return (default: 100)",
"default": 100,
},
},
"required": ["query"],
},
),
Tool(
name="loki_labels",
description=(
"List all known label names in Loki. Use this to discover "
"what labels are available for querying (e.g. job, host, "
"container, filename)."
),
inputSchema={
"type": "object",
"properties": {},
},
),
Tool(
name="loki_label_values",
description=(
"Get all values for a specific label. Useful for discovering "
"what jobs, hosts, or containers are sending logs. "
"Example: label='job' might return ['varlogs', 'caddy', 'grafana']."
),
inputSchema={
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "The label name to get values for (e.g. 'job', 'host', 'container')",
},
},
"required": ["label"],
},
),
Tool(
name="loki_series",
description=(
"List active log streams/series. Shows what label combinations "
"are actively producing logs. Optionally filter with a stream selector."
),
inputSchema={
"type": "object",
"properties": {
"match": {
"type": "string",
"description": "Optional LogQL stream selector to filter, e.g. '{job=\"varlogs\"}'",
},
"hours": {
"type": "integer",
"description": "How many hours back to search (default: 1)",
"default": 1,
},
},
},
),
Tool(
name="loki_health",
description="Check if Loki is reachable and responding.",
inputSchema={
"type": "object",
"properties": {},
},
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "loki_query":
return await _handle_query(arguments)
elif name == "loki_labels":
return await _handle_labels()
elif name == "loki_label_values":
return await _handle_label_values(arguments)
elif name == "loki_series":
return await _handle_series(arguments)
elif name == "loki_health":
return await _handle_health()
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
logger.error("[Loki MCP] Error in %s: %s", name, e, exc_info=True)
return [TextContent(type="text", text=f"Error: {str(e)}")]
# ------------------------------------------------------------------
# Tool handlers
# ------------------------------------------------------------------
async def _handle_query(args: dict) -> list[TextContent]:
"""Run a LogQL query and return formatted log lines."""
query = args["query"]
hours = args.get("hours", 1)
limit = args.get("limit", 100)
result = await client.query_range(query=query, hours=hours, limit=limit)
# Check for errors in the response
if result.get("status") != "success":
error_msg = result.get("message", "Unknown error")
return [TextContent(type="text", text=f"Loki query error: {error_msg}")]
# Extract readable log lines
lines = client.extract_lines(result)
if not lines:
return [TextContent(
type="text",
text=f"No logs found for query: {query} (last {hours}h)",
)]
# Format output
output_parts = [f"Found {len(lines)} log lines for: {query} (last {hours}h)\n"]
for entry in lines:
output_parts.append(
f"[{entry['timestamp']}] ({entry['labels']}) {entry['line']}"
)
# Truncate if too long (prevent token explosion)
output = "\n".join(output_parts)
if len(output) > 15000:
output = output[:15000] + f"\n\n... truncated ({len(lines)} total lines)"
return [TextContent(type="text", text=output)]
async def _handle_labels() -> list[TextContent]:
"""List all label names."""
labels = await client.labels()
if not labels:
return [TextContent(type="text", text="No labels found in Loki.")]
output = f"Available labels ({len(labels)}):\n" + "\n".join(f" - {l}" for l in sorted(labels))
return [TextContent(type="text", text=output)]
async def _handle_label_values(args: dict) -> list[TextContent]:
"""Get values for a specific label."""
label = args["label"]
values = await client.label_values(label)
if not values:
return [TextContent(type="text", text=f"No values found for label '{label}'.")]
output = f"Values for '{label}' ({len(values)}):\n" + "\n".join(f" - {v}" for v in sorted(values))
return [TextContent(type="text", text=output)]
async def _handle_series(args: dict) -> list[TextContent]:
"""List active streams/series."""
match_str = args.get("match")
hours = args.get("hours", 1)
match_list = [match_str] if match_str else None
series = await client.series(match=match_list, hours=hours)
if not series:
return [TextContent(type="text", text="No active series found.")]
output_parts = [f"Active streams ({len(series)}):"]
for s in series[:50]: # Cap at 50 to avoid huge output
labels_str = ", ".join(f'{k}="{v}"' for k, v in s.items())
output_parts.append(f" {{{labels_str}}}")
if len(series) > 50:
output_parts.append(f"\n ... and {len(series) - 50} more")
return [TextContent(type="text", text="\n".join(output_parts))]
async def _handle_health() -> list[TextContent]:
"""Check Loki health."""
healthy = await client.health()
if healthy:
return [TextContent(type="text", text=f"✅ Loki is healthy and reachable at {client.url}")]
else:
return [TextContent(type="text", text=f"❌ Loki is NOT reachable at {client.url}")]
# ------------------------------------------------------------------
# Entry point
# ------------------------------------------------------------------
async def main():
"""Run the MCP server over stdio."""
logger.info("[Loki MCP] Starting server (Loki URL: %s)", client.url)
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())