179 lines
6.4 KiB
Python
Executable File
179 lines
6.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Log anomaly checker that queries Elasticsearch and asks an OpenRouter-hosted LLM
|
|
for a quick triage summary. Intended to be run on a schedule (cron/systemd).
|
|
|
|
Required environment variables:
|
|
ELASTIC_HOST e.g. https://casper.localdomain:9200
|
|
ELASTIC_API_KEY Base64 ApiKey used for Elasticsearch requests
|
|
OPENROUTER_API_KEY Token for https://openrouter.ai/
|
|
|
|
Optional environment variables:
|
|
OPENROUTER_MODEL Model identifier (default: openai/gpt-4o-mini)
|
|
OPENROUTER_REFERER Passed through as HTTP-Referer header
|
|
OPENROUTER_TITLE Passed through as X-Title header
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import os
|
|
import sys
|
|
from typing import Any, Iterable
|
|
|
|
import requests
|
|
|
|
|
|
def utc_iso(ts: dt.datetime) -> str:
|
|
"""Return an ISO8601 string with Z suffix."""
|
|
return ts.replace(microsecond=0).isoformat() + "Z"
|
|
|
|
|
|
def query_elasticsearch(
|
|
host: str,
|
|
api_key: str,
|
|
index_pattern: str,
|
|
minutes: int,
|
|
size: int,
|
|
verify: bool,
|
|
) -> list[dict[str, Any]]:
|
|
"""Fetch recent logs from Elasticsearch."""
|
|
end = dt.datetime.utcnow()
|
|
start = end - dt.timedelta(minutes=minutes)
|
|
url = f"{host.rstrip('/')}/{index_pattern}/_search"
|
|
payload = {
|
|
"size": size,
|
|
"sort": [{"@timestamp": {"order": "desc"}}],
|
|
"query": {
|
|
"range": {
|
|
"@timestamp": {
|
|
"gte": utc_iso(start),
|
|
"lte": utc_iso(end),
|
|
}
|
|
}
|
|
},
|
|
"_source": ["@timestamp", "message", "host.name", "container.image.name", "log.level"],
|
|
}
|
|
headers = {
|
|
"Authorization": f"ApiKey {api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.post(url, json=payload, headers=headers, timeout=30, verify=verify)
|
|
response.raise_for_status()
|
|
hits = response.json().get("hits", {}).get("hits", [])
|
|
return hits
|
|
|
|
|
|
def build_prompt(logs: Iterable[dict[str, Any]], limit_messages: int) -> str:
|
|
"""Create the prompt that will be sent to the LLM."""
|
|
selected = []
|
|
for idx, hit in enumerate(logs):
|
|
if idx >= limit_messages:
|
|
break
|
|
source = hit.get("_source", {})
|
|
message = source.get("message") or source.get("event", {}).get("original") or ""
|
|
timestamp = source.get("@timestamp", "unknown time")
|
|
host = source.get("host", {}).get("name") or source.get("host", {}).get("hostname") or "unknown-host"
|
|
container = source.get("container", {}).get("image", {}).get("name") or ""
|
|
level = source.get("log", {}).get("level") or source.get("log.level") or ""
|
|
selected.append(
|
|
f"[{timestamp}] host={host} level={level} container={container}\n{message}".strip()
|
|
)
|
|
|
|
if not selected:
|
|
return "No logs were returned from Elasticsearch in the requested window."
|
|
|
|
prompt = (
|
|
"You are assisting with HomeLab observability. Review the following log entries collected from "
|
|
"Elasticsearch and highlight any notable anomalies, errors, or emerging issues. "
|
|
"Explain the impact and suggest next steps when applicable. "
|
|
"Use concise bullet points. Logs:\n\n"
|
|
+ "\n\n".join(selected)
|
|
)
|
|
return prompt
|
|
|
|
|
|
def call_openrouter(prompt: str, model: str, api_key: str, referer: str | None, title: str | None) -> str:
|
|
"""Send prompt to OpenRouter and return the model response text."""
|
|
url = "https://openrouter.ai/api/v1/chat/completions"
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
if referer:
|
|
headers["HTTP-Referer"] = referer
|
|
if title:
|
|
headers["X-Title"] = title
|
|
|
|
body = {
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": "You are a senior SRE helping analyze log anomalies."},
|
|
{"role": "user", "content": prompt},
|
|
],
|
|
}
|
|
|
|
response = requests.post(url, json=body, headers=headers, timeout=60)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
choices = data.get("choices", [])
|
|
if not choices:
|
|
raise RuntimeError("OpenRouter response did not include choices")
|
|
return choices[0]["message"]["content"]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Query Elasticsearch and summarize logs with OpenRouter.")
|
|
parser.add_argument("--host", default=os.environ.get("ELASTIC_HOST"), help="Elasticsearch host URL")
|
|
parser.add_argument("--api-key", default=os.environ.get("ELASTIC_API_KEY"), help="Elasticsearch ApiKey")
|
|
parser.add_argument("--index", default="log*", help="Index pattern (default: log*)")
|
|
parser.add_argument("--minutes", type=int, default=60, help="Lookback window in minutes (default: 60)")
|
|
parser.add_argument("--size", type=int, default=200, help="Max number of logs to fetch (default: 200)")
|
|
parser.add_argument("--message-limit", type=int, default=50, help="Max log lines sent to LLM (default: 50)")
|
|
parser.add_argument("--openrouter-model", default=os.environ.get("OPENROUTER_MODEL", "openai/gpt-4o-mini"))
|
|
parser.add_argument("--insecure", action="store_true", help="Disable TLS verification for Elasticsearch")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if not args.host or not args.api_key:
|
|
print("ELASTIC_HOST and ELASTIC_API_KEY must be provided via environment or CLI", file=sys.stderr)
|
|
return 1
|
|
|
|
logs = query_elasticsearch(
|
|
host=args.host,
|
|
api_key=args.api_key,
|
|
index_pattern=args.index,
|
|
minutes=args.minutes,
|
|
size=args.size,
|
|
verify=not args.insecure,
|
|
)
|
|
|
|
prompt = build_prompt(logs, limit_messages=args.message_limit)
|
|
if not prompt.strip() or prompt.startswith("No logs"):
|
|
print(prompt)
|
|
return 0
|
|
|
|
openrouter_key = os.environ.get("OPENROUTER_API_KEY")
|
|
if not openrouter_key:
|
|
print("OPENROUTER_API_KEY is required to summarize logs", file=sys.stderr)
|
|
return 1
|
|
|
|
referer = os.environ.get("OPENROUTER_REFERER")
|
|
title = os.environ.get("OPENROUTER_TITLE", "Elastic Log Monitor")
|
|
response_text = call_openrouter(
|
|
prompt=prompt,
|
|
model=args.openrouter_model,
|
|
api_key=openrouter_key,
|
|
referer=referer,
|
|
title=title,
|
|
)
|
|
print(response_text.strip())
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|