935 lines
35 KiB
Python

import base64
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import parse_qs, unquote, urlparse
import requests
from dotenv import load_dotenv
from flask import Flask, abort, jsonify, render_template, request
BASE_DIR = Path(__file__).resolve().parent.parent
env_path = BASE_DIR / ".env"
if env_path.exists():
load_dotenv(env_path)
ES_URL = os.getenv("ES_URL", "http://localhost:9200").rstrip("/")
ES_VERIFY_SSL = os.getenv("ES_VERIFY_SSL", "false").lower() == "true"
app = Flask(__name__)
HOST_SEARCH_LIMIT = int(os.getenv("FRONTEND_HOST_LIMIT", "1000"))
DEFAULT_EVENT_LIMIT = int(os.getenv("FRONTEND_EVENT_LIMIT", "200"))
SERVER_VERSION = os.getenv("NETWORK_MCP_VERSION", "0.1.0")
REST_TOOLS = [
{
"name": "list_hosts",
"description": "Return the merged view of every known device on the network (searchable by hostname, IP, or MAC).",
"method": "GET",
"path": "/api/hosts",
},
{
"name": "network_map",
"description": "Summarize hosts grouped by detected /24 (IPv4) or /64 (IPv6) networks.",
"method": "GET",
"path": "/api/map",
},
{
"name": "get_host",
"description": "Fetch a single host document by ID (e.g. ip:192.168.5.10).",
"method": "GET",
"path": "/api/hosts/{host_id}",
},
{
"name": "list_events",
"description": "List recent scan/discovery events with filters for host, type, or time range.",
"method": "GET",
"path": "/api/events",
},
{
"name": "host_events",
"description": "List the recent events associated with a specific host.",
"method": "GET",
"path": "/api/hosts/{host_id}/events",
},
]
def tool_schema(description: str, properties: Dict[str, Any], required: Optional[List[str]] = None, title: Optional[str] = None):
schema: Dict[str, Any] = {
"type": "object",
"description": description,
"properties": properties,
"additionalProperties": False,
}
if required:
schema["required"] = required
if title:
schema["title"] = title
return schema
PORT_SCHEMA = tool_schema(
"Observed port entry.",
{
"port": {"type": "integer", "description": "Port number."},
"state": {"type": "string", "description": "State reported by nmap (e.g. open, closed)."},
"service": {"type": "string", "description": "Detected service name, if available."},
},
required=["port"],
title="Port",
)
HOST_SCHEMA = tool_schema(
"Host summary merged from inventory, OPNsense, and nmap.",
{
"id": {"type": "string", "description": "Stable host identifier (ip:* or mac:*)."},
"name": {"type": "string", "description": "Best-known display name."},
"ips": {"type": "array", "items": {"type": "string"}, "description": "Associated IP addresses."},
"macs": {"type": "array", "items": {"type": "string"}, "description": "Observed MAC addresses."},
"hostnames": {"type": "array", "items": {"type": "string"}, "description": "DNS or hostnames discovered."},
"sources": {"type": "array", "items": {"type": "string"}, "description": "Data sources contributing to this record."},
"last_seen": {"type": "string", "description": "ISO timestamp of the most recent observation."},
"notes": {"type": "string", "description": "Inventory notes/annotations, if present."},
"expected_ports": {"type": "array", "items": {"type": "string"}, "description": "Ports expected per inventory targets."},
"ports": {"type": "array", "items": PORT_SCHEMA, "description": "Latest observed open ports."},
},
required=["id"],
title="Host",
)
EVENT_SCHEMA = tool_schema(
"Scan or discovery event emitted by collectors.",
{
"id": {"type": "string", "description": "Event document identifier."},
"timestamp": {"type": "string", "description": "Observation timestamp (@timestamp)."},
"source": {"type": "string", "description": "Collector that produced the event (nmap, opnsense, inventory)."},
"event": {"type": "object", "description": "Event metadata (type, outcome)."},
"host": HOST_SCHEMA,
"ports": {"type": "array", "items": PORT_SCHEMA, "description": "Ports included with the event (if any)."},
},
required=["id", "timestamp"],
title="Event",
)
NETWORK_ENTRY_SCHEMA = tool_schema(
"Network grouping entry showing hosts per /24 or /64.",
{
"cidr": {"type": "string", "description": "CIDR label (e.g. 192.168.5.0/24)."},
"hosts": {"type": "array", "items": HOST_SCHEMA, "description": "Hosts that belong to this network."},
},
required=["cidr", "hosts"],
title="NetworkEntry",
)
MCP_TOOL_DEFINITIONS = {
"list_hosts": {
"title": "List Hosts",
"description": "Return the merged view of every known device on the network with optional filtering by source or identifier.",
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
"inputSchema": tool_schema(
"Filter options when listing hosts.",
{
"limit": {"type": "integer", "minimum": 1, "maximum": 5000, "title": "Limit", "description": "Maximum number of hosts to return."},
"source": {"type": "string", "title": "Source filter", "description": "Only include hosts that contain this source tag (e.g. inventory, nmap, opnsense-arp)."},
"terms": {
"type": "array",
"items": {"type": "string"},
"title": "Search terms",
"description": "Identifiers (names, hostnames, IPs, or MACs) to match. Equivalent to repeated q parameters in the REST API.",
},
},
title="ListHostsInput",
),
"outputSchema": tool_schema(
"Host list result payload.",
{
"total": {"type": "integer", "description": "Number of hosts returned."},
"hosts": {"type": "array", "items": HOST_SCHEMA, "description": "Host entries sorted by last-seen time."},
},
required=["total", "hosts"],
title="ListHostsResult",
),
},
"network_map": {
"title": "Network Map",
"description": "Summarize hosts grouped by detected /24 (IPv4) or /64 (IPv6) ranges.",
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
"inputSchema": tool_schema(
"Options when generating the network grouping.",
{
"limit": {"type": "integer", "minimum": 1, "maximum": 5000, "title": "Host limit", "description": "Maximum number of hosts to consider when building the map."},
},
title="NetworkMapInput",
),
"outputSchema": tool_schema(
"Grouped view of networks and their hosts.",
{
"host_count": {"type": "integer", "description": "Number of hosts examined for this map."},
"networks": {"type": "array", "items": NETWORK_ENTRY_SCHEMA, "description": "List of network segments and their hosts."},
},
required=["host_count", "networks"],
title="NetworkMapResult",
),
},
"get_host": {
"title": "Get Host",
"description": "Fetch a single host document by ID, optionally including recent events.",
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
"inputSchema": tool_schema(
"Parameters for retrieving an individual host.",
{
"host_id": {"type": "string", "title": "Host ID", "description": "Host identifier (e.g. ip:192.168.5.10, mac:aa:bb:cc...)."},
"include_events": {"type": "boolean", "title": "Include events", "description": "If true, include recent events for the host."},
"events_limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Events limit", "description": "Number of events to include if requested."},
},
required=["host_id"],
title="GetHostInput",
),
"outputSchema": tool_schema(
"Host payload with optional embedded events.",
{
"host": HOST_SCHEMA,
"events": {"type": "array", "items": EVENT_SCHEMA, "description": "Recent events when include_events=true."},
},
required=["host"],
title="GetHostResult",
),
},
"list_events": {
"title": "List Events",
"description": "List recent scan/discovery events with optional filters.",
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
"inputSchema": tool_schema(
"Filters applied when querying events.",
{
"limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Limit", "description": "Maximum number of events to return."},
"host_id": {"type": "string", "title": "Host filter", "description": "Only include events for this host identifier."},
"type": {"type": "string", "title": "Event type", "description": "Restrict to a specific event type (e.g. scan, discovery)."},
"since": {"type": "string", "title": "Since timestamp", "description": "ISO8601 timestamp used as a lower bound for @timestamp."},
},
title="ListEventsInput",
),
"outputSchema": tool_schema(
"Event search result.",
{
"total": {"type": "integer", "description": "Number of events returned."},
"events": {"type": "array", "items": EVENT_SCHEMA, "description": "Event documents sorted by timestamp."},
},
required=["total", "events"],
title="ListEventsResult",
),
},
"host_events": {
"title": "Host Events",
"description": "List recent events associated with a specific host.",
"annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False},
"inputSchema": tool_schema(
"Parameters when retrieving events bound to a single host.",
{
"host_id": {"type": "string", "title": "Host ID", "description": "Host identifier to filter by."},
"limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Limit", "description": "Maximum number of events to return."},
"type": {"type": "string", "title": "Event type", "description": "Restrict to a specific event type (e.g. scan, discovery)."},
"since": {"type": "string", "title": "Since timestamp", "description": "ISO8601 timestamp used as a lower bound for @timestamp."},
},
required=["host_id"],
title="HostEventsInput",
),
"outputSchema": tool_schema(
"Event list scoped to a host.",
{
"total": {"type": "integer", "description": "Number of events returned for the host."},
"events": {"type": "array", "items": EVENT_SCHEMA, "description": "Host-specific event entries."},
},
required=["total", "events"],
title="HostEventsResult",
),
},
}
def resolve_api_key(api_id: str, api_key: str):
if api_id and api_key:
return api_id, api_key
if not api_key:
return None, None
if ":" in api_key:
possible_id, possible_key = api_key.split(":", 1)
return possible_id, possible_key
try:
decoded = base64.b64decode(api_key).decode()
if ":" in decoded:
possible_id, possible_key = decoded.split(":", 1)
return possible_id, possible_key
except Exception:
pass
return None, None
def build_es_request():
headers = {}
auth = None
api_id = os.getenv("ES_API_ID")
api_key = os.getenv("ES_API_KEY")
api_id, api_key = resolve_api_key(api_id, api_key)
if api_id and api_key:
token = base64.b64encode(f"{api_id}:{api_key}".encode()).decode()
headers["Authorization"] = f"ApiKey {token}"
else:
auth = (os.getenv("ES_USER", "elastic"), os.getenv("ES_PASS", "changeme"))
return headers, auth
def normalize_host(doc: Dict) -> Dict:
host = doc.get("host", {})
ports = doc.get("ports", [])
return {
"id": host.get("id"),
"name": host.get("name") or host.get("id"),
"ips": host.get("ips", []),
"macs": host.get("macs", []),
"hostnames": host.get("hostnames", []),
"sources": host.get("sources", []),
"last_seen": host.get("last_seen"),
"notes": host.get("notes"),
"expected_ports": host.get("expected_ports", []),
"ports": [
{
"port": p.get("port"),
"state": p.get("state"),
"service": (p.get("service") or {}).get("name"),
}
for p in ports
],
}
def parse_search_terms(raw_terms: List[str]) -> List[str]:
terms: List[str] = []
for raw in raw_terms:
if not raw:
continue
cleaned = raw.replace(",", " ")
for chunk in cleaned.split():
chunk = chunk.strip()
if chunk:
terms.append(chunk)
return terms
def coerce_string_list(value: Any) -> List[str]:
if value is None:
return []
if isinstance(value, str):
return [value]
if isinstance(value, (list, tuple)):
return [str(item) for item in value if item is not None]
return []
def clamp_int(value: Any, default: int, min_value: int, max_value: int) -> int:
try:
if value is None:
return default
parsed = int(value)
except (TypeError, ValueError):
return default
return max(min_value, min(max_value, parsed))
def coerce_bool(value: Any, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in {"1", "true", "yes", "on"}
return default
def build_search_clause(term: str) -> Dict:
wildcard = f"*{term}*"
return {
"bool": {
"should": [
{"wildcard": {"host.name.keyword": {"value": wildcard, "case_insensitive": True}}},
{"wildcard": {"host.hostnames.keyword": {"value": wildcard, "case_insensitive": True}}},
{"wildcard": {"host.id.keyword": {"value": wildcard, "case_insensitive": True}}},
{"wildcard": {"host.ips": {"value": wildcard, "case_insensitive": True}}},
{"wildcard": {"host.macs": {"value": wildcard, "case_insensitive": True}}},
],
"minimum_should_match": 1,
}
}
def fetch_hosts(limit: int = HOST_SEARCH_LIMIT, source: Optional[str] = None, search_terms: Optional[List[str]] = None):
headers, auth = build_es_request()
body = {
"size": limit,
"sort": [{"host.last_seen": {"order": "desc"}}],
}
filters: List[Dict] = []
if source:
filters.append({"term": {"host.sources.keyword": source}})
if search_terms:
should_clauses = [build_search_clause(term) for term in search_terms]
filters.append({"bool": {"should": should_clauses, "minimum_should_match": 1}})
if filters:
body["query"] = {"bool": {"filter": filters}}
resp = requests.get(
f"{ES_URL}/network-hosts/_search",
json=body,
headers=headers,
auth=auth,
verify=ES_VERIFY_SSL,
)
resp.raise_for_status()
return [normalize_host(hit.get("_source", {})) for hit in resp.json()["hits"]["hits"]]
def fetch_host_by_id(host_id: str) -> Optional[Dict]:
headers, auth = build_es_request()
body = {"size": 1, "query": {"term": {"host.id.keyword": host_id}}}
resp = requests.get(
f"{ES_URL}/network-hosts/_search",
json=body,
headers=headers,
auth=auth,
verify=ES_VERIFY_SSL,
)
resp.raise_for_status()
hits = resp.json()["hits"]["hits"]
if not hits:
return None
return normalize_host(hits[0].get("_source", {}))
def fetch_events(host_id: Optional[str] = None, limit: int = DEFAULT_EVENT_LIMIT, event_type: Optional[str] = None, since: Optional[str] = None):
headers, auth = build_es_request()
filters: List[Dict] = []
if host_id:
filters.append({"term": {"host.id.keyword": host_id}})
if event_type:
filters.append({"term": {"event.type.keyword": event_type}})
if since:
filters.append({"range": {"@timestamp": {"gte": since}}})
body: Dict = {
"size": limit,
"sort": [{"@timestamp": {"order": "desc"}}],
}
if filters:
body["query"] = {"bool": {"filter": filters}}
resp = requests.get(
f"{ES_URL}/network-events-*/_search",
json=body,
headers=headers,
auth=auth,
verify=ES_VERIFY_SSL,
)
if resp.status_code == 404:
return []
resp.raise_for_status()
events = []
for hit in resp.json()["hits"]["hits"]:
doc = hit.get("_source", {})
events.append(
{
"id": hit.get("_id"),
"timestamp": doc.get("@timestamp"),
"event": doc.get("event", {}),
"host": doc.get("host", {}),
"observed": doc.get("observed"),
"scan": doc.get("scan"),
"ports": doc.get("ports", []),
"source": doc.get("source"),
}
)
return events
def derive_network_label(ip: str) -> str:
if not ip:
return "unknown"
if ":" in ip:
parts = ip.split(":")
prefix = ":".join(parts[:4])
return f"{prefix}::/64"
octets = ip.split(".")
if len(octets) == 4:
return f"{octets[0]}.{octets[1]}.{octets[2]}.0/24"
return "unknown"
def build_network_map(hosts: List[Dict]):
networks: Dict[str, Dict] = {}
for host in hosts:
seen = set()
for ip in host.get("ips", []):
label = derive_network_label(ip)
if label in seen:
continue
seen.add(label)
entry = networks.setdefault(label, {"cidr": label, "hosts": []})
entry["hosts"].append(
{
"id": host.get("id"),
"name": host.get("name"),
"ips": host.get("ips", []),
"sources": host.get("sources", []),
"last_seen": host.get("last_seen"),
}
)
sorted_networks = sorted(networks.values(), key=lambda n: n["cidr"])
for entry in sorted_networks:
entry["hosts"].sort(key=lambda h: h.get("name") or h.get("id") or "")
return sorted_networks
def bool_arg(value: Optional[str], default: bool = False) -> bool:
if value is None:
return default
return value.lower() in {"1", "true", "yes", "on"}
def build_manifest(base_url: str) -> Dict:
base = base_url.rstrip("/")
tools = []
for tool in REST_TOOLS:
tools.append(
{
"name": tool["name"],
"description": tool["description"],
"method": tool["method"],
"path": tool["path"],
"url": f"{base}{tool['path']}",
}
)
return {
"name": "network-mcp",
"description": "Network discovery source-of-truth backed by Elasticsearch, Nmap, and OPNsense.",
"schema": "1.0",
"tools": tools,
"auth": "env",
}
def tool_result(summary: str, data: Dict[str, Any]):
return summary, data
def handle_tool_list_hosts(arguments: Dict[str, Any]):
limit = clamp_int(arguments.get("limit"), HOST_SEARCH_LIMIT, 1, 5000)
raw_terms = coerce_string_list(arguments.get("terms"))
search_terms = parse_search_terms(raw_terms)
hosts = fetch_hosts(limit=limit, source=arguments.get("source"), search_terms=search_terms or None)
return tool_result(f"Returned {len(hosts)} hosts.", {"hosts": hosts, "total": len(hosts)})
def handle_tool_network_map(arguments: Dict[str, Any]):
limit = clamp_int(arguments.get("limit"), HOST_SEARCH_LIMIT, 1, 5000)
hosts = fetch_hosts(limit=limit)
network_map = build_network_map(hosts)
return tool_result(f"Computed {len(network_map)} networks.", {"networks": network_map, "host_count": len(hosts)})
def handle_tool_get_host(arguments: Dict[str, Any]):
host_id = arguments.get("host_id")
if not host_id:
raise ValueError("host_id is required")
host = fetch_host_by_id(host_id)
if not host:
raise KeyError(f"Host {host_id} not found")
include_events = coerce_bool(arguments.get("include_events"), default=False)
result = {"host": host}
if include_events:
events_limit = clamp_int(arguments.get("events_limit"), DEFAULT_EVENT_LIMIT, 1, 1000)
result["events"] = fetch_events(host_id=host_id, limit=events_limit)
return tool_result(f"Fetched host {host_id}.", result)
def handle_tool_list_events(arguments: Dict[str, Any]):
limit = clamp_int(arguments.get("limit"), DEFAULT_EVENT_LIMIT, 1, 1000)
events = fetch_events(
host_id=arguments.get("host_id"),
limit=limit,
event_type=arguments.get("type"),
since=arguments.get("since"),
)
return tool_result(f"Returned {len(events)} events.", {"events": events, "total": len(events)})
def handle_tool_host_events(arguments: Dict[str, Any]):
host_id = arguments.get("host_id")
if not host_id:
raise ValueError("host_id is required")
limit = clamp_int(arguments.get("limit"), DEFAULT_EVENT_LIMIT, 1, 1000)
events = fetch_events(host_id=host_id, limit=limit, event_type=arguments.get("type"), since=arguments.get("since"))
return tool_result(f"Returned {len(events)} events for {host_id}.", {"events": events, "total": len(events)})
TOOL_HANDLERS = {
"list_hosts": handle_tool_list_hosts,
"network_map": handle_tool_network_map,
"get_host": handle_tool_get_host,
"list_events": handle_tool_list_events,
"host_events": handle_tool_host_events,
}
def list_mcp_tools():
tools = []
for name, meta in MCP_TOOL_DEFINITIONS.items():
tool = {
"name": name,
"description": meta.get("description"),
"inputSchema": meta.get("inputSchema", {"type": "object"}),
}
title = meta.get("title")
if title:
tool["title"] = title
output_schema = meta.get("outputSchema")
if output_schema:
tool["outputSchema"] = output_schema
annotations = meta.get("annotations")
if annotations:
tool["annotations"] = annotations
tools.append(tool)
return tools
def call_tool_by_name(name: str, arguments: Optional[Dict[str, Any]] = None):
if name not in TOOL_HANDLERS:
raise KeyError(f"Unknown tool: {name}")
handler = TOOL_HANDLERS[name]
summary, data = handler(arguments or {})
return summary, data
def list_mcp_resources(base_uri: str = "network://"):
return [
{
"uri": f"{base_uri}hosts",
"name": "hosts",
"title": "Hosts (Snapshot)",
"mimeType": "application/json",
"description": "Snapshot of merged hosts (inventory + opnsense + nmap). Use resources/templates/list for search parameters.",
},
{
"uri": f"{base_uri}map",
"name": "map",
"title": "Network Map (Snapshot)",
"mimeType": "application/json",
"description": "Snapshot of networks grouped by /24 (IPv4) or /64 (IPv6).",
},
{
"uri": f"{base_uri}events",
"name": "events",
"title": "Recent Events (Snapshot)",
"mimeType": "application/json",
"description": "Recent scan/discovery events. Use resources/templates/list for filters (host_id/type/since).",
},
]
def list_mcp_resource_templates(base_uri: str = "network://"):
return [
{
"uriTemplate": f"{base_uri}hosts{{?q,source,limit}}",
"name": "hosts_query",
"title": "Hosts Query",
"mimeType": "application/json",
"description": "Query hosts by q (hostname/IP/MAC/name, case-insensitive), source, and limit. Repeat q to provide multiple terms.",
},
{
"uriTemplate": f"{base_uri}host/{{host_id}}{{?include_events,events_limit}}",
"name": "host_detail",
"title": "Host Detail",
"mimeType": "application/json",
"description": "Fetch a single host by host_id (e.g. mac:aa:bb.. or ip:192.168.5.10). Optionally include events.",
},
{
"uriTemplate": f"{base_uri}events{{?host_id,type,since,limit}}",
"name": "events_query",
"title": "Events Query",
"mimeType": "application/json",
"description": "Query recent events with optional filters host_id, type, since (ISO8601), and limit.",
},
{
"uriTemplate": f"{base_uri}map{{?limit}}",
"name": "map_query",
"title": "Network Map",
"mimeType": "application/json",
"description": "Build a network map from up to limit hosts.",
},
]
def read_mcp_resource(uri: str):
parsed = urlparse(uri)
if parsed.scheme != "network":
raise ValueError(f"Unsupported resource URI scheme: {parsed.scheme}")
netloc = parsed.netloc
query = parse_qs(parsed.query or "")
if netloc == "hosts":
limit = clamp_int((query.get("limit") or [HOST_SEARCH_LIMIT])[0], HOST_SEARCH_LIMIT, 1, 5000)
source = (query.get("source") or [None])[0]
q_terms = query.get("q") or []
search_terms = parse_search_terms(q_terms)
payload = {"hosts": fetch_hosts(limit=limit, source=source, search_terms=search_terms or None)}
payload["total"] = len(payload["hosts"])
return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]}
if netloc == "map":
limit = clamp_int((query.get("limit") or [HOST_SEARCH_LIMIT])[0], HOST_SEARCH_LIMIT, 1, 5000)
hosts = fetch_hosts(limit=limit)
payload = {"networks": build_network_map(hosts), "host_count": len(hosts)}
return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]}
if netloc == "events":
limit = clamp_int((query.get("limit") or [DEFAULT_EVENT_LIMIT])[0], DEFAULT_EVENT_LIMIT, 1, 1000)
host_id = (query.get("host_id") or [None])[0]
event_type = (query.get("type") or [None])[0]
since = (query.get("since") or [None])[0]
events = fetch_events(host_id=host_id, limit=limit, event_type=event_type, since=since)
payload = {"events": events, "total": len(events)}
return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]}
if netloc == "host":
host_id = unquote((parsed.path or "").lstrip("/"))
if not host_id:
raise ValueError("Host resource requires /<host_id> path")
include_events = coerce_bool((query.get("include_events") or [False])[0], default=False)
events_limit = clamp_int((query.get("events_limit") or [DEFAULT_EVENT_LIMIT])[0], DEFAULT_EVENT_LIMIT, 1, 1000)
host = fetch_host_by_id(host_id)
if not host:
raise KeyError(f"Host {host_id} not found")
payload = {"host": host}
if include_events:
payload["events"] = fetch_events(host_id=host_id, limit=events_limit)
return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]}
raise ValueError(f"Unknown resource URI: {uri}")
def jsonrpc_error(rpc_id: Any, code: int, message: str):
return {
"jsonrpc": "2.0",
"id": rpc_id,
"error": {"code": code, "message": message},
}
def build_initialize_result(protocol_version: Optional[str] = None):
protocol_version = protocol_version or "2025-11-25"
return {
"protocolVersion": protocol_version,
"capabilities": {
"tools": {"listChanged": False},
"resources": {"listChanged": False, "subscribe": False},
},
"serverInfo": {"name": "network-mcp", "version": SERVER_VERSION},
"instructions": "Start with list_hosts (search by hostname/IP/MAC), then use get_host for details and list_events/host_events for timelines; network_map gives a quick /24-/64 overview.",
}
def process_rpc_request(payload: Dict[str, Any]):
if not isinstance(payload, dict):
return jsonrpc_error(None, -32600, "Invalid request")
rpc_id = payload.get("id")
method = payload.get("method")
params = payload.get("params") or {}
is_notification = rpc_id is None
if method == "initialize":
requested = params.get("protocolVersion")
requested_str = str(requested) if requested is not None else None
return {"jsonrpc": "2.0", "id": rpc_id, "result": build_initialize_result(requested_str)}
if method == "ping":
return {"jsonrpc": "2.0", "id": rpc_id, "result": {}}
if method == "tools/list":
result = {"tools": list_mcp_tools(), "nextCursor": None}
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
if method == "resources/list":
result = {"resources": list_mcp_resources(), "nextCursor": None}
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
if method == "resources/templates/list":
result = {"resourceTemplates": list_mcp_resource_templates(), "nextCursor": None}
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
if method == "resources/read":
uri = (params or {}).get("uri")
if not uri:
return jsonrpc_error(rpc_id, -32602, "uri is required")
try:
result = read_mcp_resource(uri)
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
except ValueError as exc:
return jsonrpc_error(rpc_id, -32602, str(exc))
except KeyError as exc:
message = exc.args[0] if exc.args else str(exc)
return jsonrpc_error(rpc_id, -32004, message)
if method == "notifications/initialized":
# No response for notifications.
return None
if method == "tools/call":
name = params.get("name")
if not name:
if is_notification:
return None
return jsonrpc_error(rpc_id, -32602, "Tool name is required")
arguments = params.get("arguments") or {}
try:
summary, data = call_tool_by_name(name, arguments)
result = {
"content": [{"type": "text", "text": summary}],
"structuredContent": data,
"isError": False,
}
if is_notification:
return None
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
except ValueError as exc:
if is_notification:
return None
result = {
"content": [{"type": "text", "text": f"Tool argument error: {exc}"}],
"structuredContent": {"error": str(exc)},
"isError": True,
}
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
except KeyError as exc:
message = exc.args[0] if exc.args else str(exc)
if is_notification:
return None
result = {
"content": [{"type": "text", "text": f"Tool error: {message}"}],
"structuredContent": {"error": message},
"isError": True,
}
return {"jsonrpc": "2.0", "id": rpc_id, "result": result}
except Exception as exc: # pragma: no cover - defensive
if is_notification:
return None
return jsonrpc_error(rpc_id, -32603, f"Internal error: {exc}")
if is_notification:
return None
return jsonrpc_error(rpc_id, -32601, f"Method {method} not found")
def process_rpc_envelope(payload: Any):
if isinstance(payload, list):
responses = []
for entry in payload:
response = process_rpc_request(entry)
if response is not None:
responses.append(response)
return responses
if isinstance(payload, dict):
return process_rpc_request(payload)
return jsonrpc_error(None, -32600, "Invalid request")
@app.route("/api/hosts")
def api_hosts():
limit = min(int(request.args.get("limit", HOST_SEARCH_LIMIT)), 5000)
q_args = request.args.getlist("q")
search_terms = parse_search_terms(q_args)
hosts = fetch_hosts(
limit=limit,
source=request.args.get("source"),
search_terms=search_terms if search_terms else None,
)
return jsonify({"hosts": hosts, "total": len(hosts)})
@app.route("/api/hosts/<path:host_id>")
def api_host_detail(host_id: str):
host = fetch_host_by_id(host_id)
if not host:
abort(404, description=f"Host {host_id} not found")
include_events = bool_arg(request.args.get("include_events"), default=False)
result = {"host": host}
if include_events:
limit = min(int(request.args.get("events_limit", DEFAULT_EVENT_LIMIT)), 1000)
result["events"] = fetch_events(host_id=host_id, limit=limit)
return jsonify(result)
@app.route("/api/events")
def api_events():
limit = min(int(request.args.get("limit", DEFAULT_EVENT_LIMIT)), 1000)
events = fetch_events(
host_id=request.args.get("host_id"),
limit=limit,
event_type=request.args.get("type"),
since=request.args.get("since"),
)
return jsonify({"events": events, "total": len(events)})
@app.route("/api/hosts/<path:host_id>/events")
def api_host_events(host_id: str):
limit = min(int(request.args.get("limit", DEFAULT_EVENT_LIMIT)), 1000)
events = fetch_events(host_id=host_id, limit=limit, event_type=request.args.get("type"), since=request.args.get("since"))
return jsonify({"events": events, "total": len(events)})
@app.route("/api/map")
def api_map():
limit = min(int(request.args.get("limit", HOST_SEARCH_LIMIT)), 5000)
hosts = fetch_hosts(limit=limit)
network_map = build_network_map(hosts)
return jsonify({"networks": network_map, "host_count": len(hosts)})
@app.route("/.well-known/mcp.json", methods=["GET", "POST", "OPTIONS"])
@app.route("/api/mcp", methods=["GET", "POST", "OPTIONS"])
def api_manifest():
if request.method == "OPTIONS":
return ("", 204, {"Allow": "GET,POST,OPTIONS"})
if request.method == "POST":
payload = request.get_json(silent=True)
if payload is None:
return jsonify(jsonrpc_error(None, -32700, "Invalid JSON")), 400
rpc_response = process_rpc_envelope(payload)
if rpc_response is None or (isinstance(rpc_response, list) and not rpc_response):
return ("", 204)
return jsonify(rpc_response)
manifest = build_manifest(request.url_root.rstrip("/"))
return jsonify(manifest)
@app.route("/")
def index():
hosts = fetch_hosts()
total = len(hosts)
with_ports = sum(1 for h in hosts if h["ports"])
inventory_hosts = sum(1 for h in hosts if "inventory" in h["sources"])
return render_template(
"index.html",
hosts=hosts,
total=total,
with_ports=with_ports,
inventory_hosts=inventory_hosts,
es_url=ES_URL,
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.getenv("FRONTEND_PORT", "5001")))