2025-12-31 20:11:44 -05:00

179 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Log anomaly checker that queries Elasticsearch and asks an OpenRouter-hosted LLM
for a quick triage summary. Intended to be run on a schedule (cron/systemd).
Required environment variables:
ELASTIC_HOST e.g. https://casper.localdomain:9200
ELASTIC_API_KEY Base64 ApiKey used for Elasticsearch requests
OPENROUTER_API_KEY Token for https://openrouter.ai/
Optional environment variables:
OPENROUTER_MODEL Model identifier (default: openai/gpt-4o-mini)
OPENROUTER_REFERER Passed through as HTTP-Referer header
OPENROUTER_TITLE Passed through as X-Title header
"""
from __future__ import annotations
import argparse
import datetime as dt
import os
import sys
from typing import Any, Iterable
import requests
def utc_iso(ts: dt.datetime) -> str:
"""Return an ISO8601 string with Z suffix."""
return ts.replace(microsecond=0).isoformat() + "Z"
def query_elasticsearch(
host: str,
api_key: str,
index_pattern: str,
minutes: int,
size: int,
verify: bool,
) -> list[dict[str, Any]]:
"""Fetch recent logs from Elasticsearch."""
end = dt.datetime.utcnow()
start = end - dt.timedelta(minutes=minutes)
url = f"{host.rstrip('/')}/{index_pattern}/_search"
payload = {
"size": size,
"sort": [{"@timestamp": {"order": "desc"}}],
"query": {
"range": {
"@timestamp": {
"gte": utc_iso(start),
"lte": utc_iso(end),
}
}
},
"_source": ["@timestamp", "message", "host.name", "container.image.name", "log.level"],
}
headers = {
"Authorization": f"ApiKey {api_key}",
"Content-Type": "application/json",
}
response = requests.post(url, json=payload, headers=headers, timeout=30, verify=verify)
response.raise_for_status()
hits = response.json().get("hits", {}).get("hits", [])
return hits
def build_prompt(logs: Iterable[dict[str, Any]], limit_messages: int) -> str:
"""Create the prompt that will be sent to the LLM."""
selected = []
for idx, hit in enumerate(logs):
if idx >= limit_messages:
break
source = hit.get("_source", {})
message = source.get("message") or source.get("event", {}).get("original") or ""
timestamp = source.get("@timestamp", "unknown time")
host = source.get("host", {}).get("name") or source.get("host", {}).get("hostname") or "unknown-host"
container = source.get("container", {}).get("image", {}).get("name") or ""
level = source.get("log", {}).get("level") or source.get("log.level") or ""
selected.append(
f"[{timestamp}] host={host} level={level} container={container}\n{message}".strip()
)
if not selected:
return "No logs were returned from Elasticsearch in the requested window."
prompt = (
"You are assisting with HomeLab observability. Review the following log entries collected from "
"Elasticsearch and highlight any notable anomalies, errors, or emerging issues. "
"Explain the impact and suggest next steps when applicable. "
"Use concise bullet points. Logs:\n\n"
+ "\n\n".join(selected)
)
return prompt
def call_openrouter(prompt: str, model: str, api_key: str, referer: str | None, title: str | None) -> str:
"""Send prompt to OpenRouter and return the model response text."""
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
if referer:
headers["HTTP-Referer"] = referer
if title:
headers["X-Title"] = title
body = {
"model": model,
"messages": [
{"role": "system", "content": "You are a senior SRE helping analyze log anomalies."},
{"role": "user", "content": prompt},
],
}
response = requests.post(url, json=body, headers=headers, timeout=60)
response.raise_for_status()
data = response.json()
choices = data.get("choices", [])
if not choices:
raise RuntimeError("OpenRouter response did not include choices")
return choices[0]["message"]["content"]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Query Elasticsearch and summarize logs with OpenRouter.")
parser.add_argument("--host", default=os.environ.get("ELASTIC_HOST"), help="Elasticsearch host URL")
parser.add_argument("--api-key", default=os.environ.get("ELASTIC_API_KEY"), help="Elasticsearch ApiKey")
parser.add_argument("--index", default="log*", help="Index pattern (default: log*)")
parser.add_argument("--minutes", type=int, default=60, help="Lookback window in minutes (default: 60)")
parser.add_argument("--size", type=int, default=200, help="Max number of logs to fetch (default: 200)")
parser.add_argument("--message-limit", type=int, default=50, help="Max log lines sent to LLM (default: 50)")
parser.add_argument("--openrouter-model", default=os.environ.get("OPENROUTER_MODEL", "openai/gpt-4o-mini"))
parser.add_argument("--insecure", action="store_true", help="Disable TLS verification for Elasticsearch")
return parser.parse_args()
def main() -> int:
args = parse_args()
if not args.host or not args.api_key:
print("ELASTIC_HOST and ELASTIC_API_KEY must be provided via environment or CLI", file=sys.stderr)
return 1
logs = query_elasticsearch(
host=args.host,
api_key=args.api_key,
index_pattern=args.index,
minutes=args.minutes,
size=args.size,
verify=not args.insecure,
)
prompt = build_prompt(logs, limit_messages=args.message_limit)
if not prompt.strip() or prompt.startswith("No logs"):
print(prompt)
return 0
openrouter_key = os.environ.get("OPENROUTER_API_KEY")
if not openrouter_key:
print("OPENROUTER_API_KEY is required to summarize logs", file=sys.stderr)
return 1
referer = os.environ.get("OPENROUTER_REFERER")
title = os.environ.get("OPENROUTER_TITLE", "Elastic Log Monitor")
response_text = call_openrouter(
prompt=prompt,
model=args.openrouter_model,
api_key=openrouter_key,
referer=referer,
title=title,
)
print(response_text.strip())
return 0
if __name__ == "__main__":
raise SystemExit(main())