import base64 import json import os from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import parse_qs, unquote, urlparse import requests from dotenv import load_dotenv from flask import Flask, abort, jsonify, render_template, request BASE_DIR = Path(__file__).resolve().parent.parent env_path = BASE_DIR / ".env" if env_path.exists(): load_dotenv(env_path) ES_URL = os.getenv("ES_URL", "http://localhost:9200").rstrip("/") ES_VERIFY_SSL = os.getenv("ES_VERIFY_SSL", "false").lower() == "true" app = Flask(__name__) HOST_SEARCH_LIMIT = int(os.getenv("FRONTEND_HOST_LIMIT", "1000")) DEFAULT_EVENT_LIMIT = int(os.getenv("FRONTEND_EVENT_LIMIT", "200")) SERVER_VERSION = os.getenv("NETWORK_MCP_VERSION", "0.1.0") REST_TOOLS = [ { "name": "list_hosts", "description": "Return the merged view of every known device on the network (searchable by hostname, IP, or MAC).", "method": "GET", "path": "/api/hosts", }, { "name": "network_map", "description": "Summarize hosts grouped by detected /24 (IPv4) or /64 (IPv6) networks.", "method": "GET", "path": "/api/map", }, { "name": "get_host", "description": "Fetch a single host document by ID (e.g. ip:192.168.5.10).", "method": "GET", "path": "/api/hosts/{host_id}", }, { "name": "list_events", "description": "List recent scan/discovery events with filters for host, type, or time range.", "method": "GET", "path": "/api/events", }, { "name": "host_events", "description": "List the recent events associated with a specific host.", "method": "GET", "path": "/api/hosts/{host_id}/events", }, ] def tool_schema(description: str, properties: Dict[str, Any], required: Optional[List[str]] = None, title: Optional[str] = None): schema: Dict[str, Any] = { "type": "object", "description": description, "properties": properties, "additionalProperties": False, } if required: schema["required"] = required if title: schema["title"] = title return schema PORT_SCHEMA = tool_schema( "Observed port entry.", { "port": {"type": "integer", "description": "Port number."}, "state": {"type": "string", "description": "State reported by nmap (e.g. open, closed)."}, "service": {"type": "string", "description": "Detected service name, if available."}, }, required=["port"], title="Port", ) HOST_SCHEMA = tool_schema( "Host summary merged from inventory, OPNsense, and nmap.", { "id": {"type": "string", "description": "Stable host identifier (ip:* or mac:*)."}, "name": {"type": "string", "description": "Best-known display name."}, "ips": {"type": "array", "items": {"type": "string"}, "description": "Associated IP addresses."}, "macs": {"type": "array", "items": {"type": "string"}, "description": "Observed MAC addresses."}, "hostnames": {"type": "array", "items": {"type": "string"}, "description": "DNS or hostnames discovered."}, "sources": {"type": "array", "items": {"type": "string"}, "description": "Data sources contributing to this record."}, "last_seen": {"type": "string", "description": "ISO timestamp of the most recent observation."}, "notes": {"type": "string", "description": "Inventory notes/annotations, if present."}, "expected_ports": {"type": "array", "items": {"type": "string"}, "description": "Ports expected per inventory targets."}, "ports": {"type": "array", "items": PORT_SCHEMA, "description": "Latest observed open ports."}, }, required=["id"], title="Host", ) EVENT_SCHEMA = tool_schema( "Scan or discovery event emitted by collectors.", { "id": {"type": "string", "description": "Event document identifier."}, "timestamp": {"type": "string", "description": "Observation timestamp (@timestamp)."}, "source": {"type": "string", "description": "Collector that produced the event (nmap, opnsense, inventory)."}, "event": {"type": "object", "description": "Event metadata (type, outcome)."}, "host": HOST_SCHEMA, "ports": {"type": "array", "items": PORT_SCHEMA, "description": "Ports included with the event (if any)."}, }, required=["id", "timestamp"], title="Event", ) NETWORK_ENTRY_SCHEMA = tool_schema( "Network grouping entry showing hosts per /24 or /64.", { "cidr": {"type": "string", "description": "CIDR label (e.g. 192.168.5.0/24)."}, "hosts": {"type": "array", "items": HOST_SCHEMA, "description": "Hosts that belong to this network."}, }, required=["cidr", "hosts"], title="NetworkEntry", ) MCP_TOOL_DEFINITIONS = { "list_hosts": { "title": "List Hosts", "description": "Return the merged view of every known device on the network with optional filtering by source or identifier.", "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, "inputSchema": tool_schema( "Filter options when listing hosts.", { "limit": {"type": "integer", "minimum": 1, "maximum": 5000, "title": "Limit", "description": "Maximum number of hosts to return."}, "source": {"type": "string", "title": "Source filter", "description": "Only include hosts that contain this source tag (e.g. inventory, nmap, opnsense-arp)."}, "terms": { "type": "array", "items": {"type": "string"}, "title": "Search terms", "description": "Identifiers (names, hostnames, IPs, or MACs) to match. Equivalent to repeated q parameters in the REST API.", }, }, title="ListHostsInput", ), "outputSchema": tool_schema( "Host list result payload.", { "total": {"type": "integer", "description": "Number of hosts returned."}, "hosts": {"type": "array", "items": HOST_SCHEMA, "description": "Host entries sorted by last-seen time."}, }, required=["total", "hosts"], title="ListHostsResult", ), }, "network_map": { "title": "Network Map", "description": "Summarize hosts grouped by detected /24 (IPv4) or /64 (IPv6) ranges.", "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, "inputSchema": tool_schema( "Options when generating the network grouping.", { "limit": {"type": "integer", "minimum": 1, "maximum": 5000, "title": "Host limit", "description": "Maximum number of hosts to consider when building the map."}, }, title="NetworkMapInput", ), "outputSchema": tool_schema( "Grouped view of networks and their hosts.", { "host_count": {"type": "integer", "description": "Number of hosts examined for this map."}, "networks": {"type": "array", "items": NETWORK_ENTRY_SCHEMA, "description": "List of network segments and their hosts."}, }, required=["host_count", "networks"], title="NetworkMapResult", ), }, "get_host": { "title": "Get Host", "description": "Fetch a single host document by ID, optionally including recent events.", "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, "inputSchema": tool_schema( "Parameters for retrieving an individual host.", { "host_id": {"type": "string", "title": "Host ID", "description": "Host identifier (e.g. ip:192.168.5.10, mac:aa:bb:cc...)."}, "include_events": {"type": "boolean", "title": "Include events", "description": "If true, include recent events for the host."}, "events_limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Events limit", "description": "Number of events to include if requested."}, }, required=["host_id"], title="GetHostInput", ), "outputSchema": tool_schema( "Host payload with optional embedded events.", { "host": HOST_SCHEMA, "events": {"type": "array", "items": EVENT_SCHEMA, "description": "Recent events when include_events=true."}, }, required=["host"], title="GetHostResult", ), }, "list_events": { "title": "List Events", "description": "List recent scan/discovery events with optional filters.", "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, "inputSchema": tool_schema( "Filters applied when querying events.", { "limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Limit", "description": "Maximum number of events to return."}, "host_id": {"type": "string", "title": "Host filter", "description": "Only include events for this host identifier."}, "type": {"type": "string", "title": "Event type", "description": "Restrict to a specific event type (e.g. scan, discovery)."}, "since": {"type": "string", "title": "Since timestamp", "description": "ISO8601 timestamp used as a lower bound for @timestamp."}, }, title="ListEventsInput", ), "outputSchema": tool_schema( "Event search result.", { "total": {"type": "integer", "description": "Number of events returned."}, "events": {"type": "array", "items": EVENT_SCHEMA, "description": "Event documents sorted by timestamp."}, }, required=["total", "events"], title="ListEventsResult", ), }, "host_events": { "title": "Host Events", "description": "List recent events associated with a specific host.", "annotations": {"readOnlyHint": True, "destructiveHint": False, "openWorldHint": False}, "inputSchema": tool_schema( "Parameters when retrieving events bound to a single host.", { "host_id": {"type": "string", "title": "Host ID", "description": "Host identifier to filter by."}, "limit": {"type": "integer", "minimum": 1, "maximum": 1000, "title": "Limit", "description": "Maximum number of events to return."}, "type": {"type": "string", "title": "Event type", "description": "Restrict to a specific event type (e.g. scan, discovery)."}, "since": {"type": "string", "title": "Since timestamp", "description": "ISO8601 timestamp used as a lower bound for @timestamp."}, }, required=["host_id"], title="HostEventsInput", ), "outputSchema": tool_schema( "Event list scoped to a host.", { "total": {"type": "integer", "description": "Number of events returned for the host."}, "events": {"type": "array", "items": EVENT_SCHEMA, "description": "Host-specific event entries."}, }, required=["total", "events"], title="HostEventsResult", ), }, } def resolve_api_key(api_id: str, api_key: str): if api_id and api_key: return api_id, api_key if not api_key: return None, None if ":" in api_key: possible_id, possible_key = api_key.split(":", 1) return possible_id, possible_key try: decoded = base64.b64decode(api_key).decode() if ":" in decoded: possible_id, possible_key = decoded.split(":", 1) return possible_id, possible_key except Exception: pass return None, None def build_es_request(): headers = {} auth = None api_id = os.getenv("ES_API_ID") api_key = os.getenv("ES_API_KEY") api_id, api_key = resolve_api_key(api_id, api_key) if api_id and api_key: token = base64.b64encode(f"{api_id}:{api_key}".encode()).decode() headers["Authorization"] = f"ApiKey {token}" else: auth = (os.getenv("ES_USER", "elastic"), os.getenv("ES_PASS", "changeme")) return headers, auth def normalize_host(doc: Dict) -> Dict: host = doc.get("host", {}) ports = doc.get("ports", []) return { "id": host.get("id"), "name": host.get("name") or host.get("id"), "ips": host.get("ips", []), "macs": host.get("macs", []), "hostnames": host.get("hostnames", []), "sources": host.get("sources", []), "last_seen": host.get("last_seen"), "notes": host.get("notes"), "expected_ports": host.get("expected_ports", []), "ports": [ { "port": p.get("port"), "state": p.get("state"), "service": (p.get("service") or {}).get("name"), } for p in ports ], } def parse_search_terms(raw_terms: List[str]) -> List[str]: terms: List[str] = [] for raw in raw_terms: if not raw: continue cleaned = raw.replace(",", " ") for chunk in cleaned.split(): chunk = chunk.strip() if chunk: terms.append(chunk) return terms def coerce_string_list(value: Any) -> List[str]: if value is None: return [] if isinstance(value, str): return [value] if isinstance(value, (list, tuple)): return [str(item) for item in value if item is not None] return [] def clamp_int(value: Any, default: int, min_value: int, max_value: int) -> int: try: if value is None: return default parsed = int(value) except (TypeError, ValueError): return default return max(min_value, min(max_value, parsed)) def coerce_bool(value: Any, default: bool = False) -> bool: if value is None: return default if isinstance(value, bool): return value if isinstance(value, str): return value.lower() in {"1", "true", "yes", "on"} return default def build_search_clause(term: str) -> Dict: wildcard = f"*{term}*" return { "bool": { "should": [ {"wildcard": {"host.name.keyword": {"value": wildcard, "case_insensitive": True}}}, {"wildcard": {"host.hostnames.keyword": {"value": wildcard, "case_insensitive": True}}}, {"wildcard": {"host.id.keyword": {"value": wildcard, "case_insensitive": True}}}, {"wildcard": {"host.ips": {"value": wildcard, "case_insensitive": True}}}, {"wildcard": {"host.macs": {"value": wildcard, "case_insensitive": True}}}, ], "minimum_should_match": 1, } } def fetch_hosts(limit: int = HOST_SEARCH_LIMIT, source: Optional[str] = None, search_terms: Optional[List[str]] = None): headers, auth = build_es_request() body = { "size": limit, "sort": [{"host.last_seen": {"order": "desc"}}], } filters: List[Dict] = [] if source: filters.append({"term": {"host.sources.keyword": source}}) if search_terms: should_clauses = [build_search_clause(term) for term in search_terms] filters.append({"bool": {"should": should_clauses, "minimum_should_match": 1}}) if filters: body["query"] = {"bool": {"filter": filters}} resp = requests.get( f"{ES_URL}/network-hosts/_search", json=body, headers=headers, auth=auth, verify=ES_VERIFY_SSL, ) resp.raise_for_status() return [normalize_host(hit.get("_source", {})) for hit in resp.json()["hits"]["hits"]] def fetch_host_by_id(host_id: str) -> Optional[Dict]: headers, auth = build_es_request() body = {"size": 1, "query": {"term": {"host.id.keyword": host_id}}} resp = requests.get( f"{ES_URL}/network-hosts/_search", json=body, headers=headers, auth=auth, verify=ES_VERIFY_SSL, ) resp.raise_for_status() hits = resp.json()["hits"]["hits"] if not hits: return None return normalize_host(hits[0].get("_source", {})) def fetch_events(host_id: Optional[str] = None, limit: int = DEFAULT_EVENT_LIMIT, event_type: Optional[str] = None, since: Optional[str] = None): headers, auth = build_es_request() filters: List[Dict] = [] if host_id: filters.append({"term": {"host.id.keyword": host_id}}) if event_type: filters.append({"term": {"event.type.keyword": event_type}}) if since: filters.append({"range": {"@timestamp": {"gte": since}}}) body: Dict = { "size": limit, "sort": [{"@timestamp": {"order": "desc"}}], } if filters: body["query"] = {"bool": {"filter": filters}} resp = requests.get( f"{ES_URL}/network-events-*/_search", json=body, headers=headers, auth=auth, verify=ES_VERIFY_SSL, ) if resp.status_code == 404: return [] resp.raise_for_status() events = [] for hit in resp.json()["hits"]["hits"]: doc = hit.get("_source", {}) events.append( { "id": hit.get("_id"), "timestamp": doc.get("@timestamp"), "event": doc.get("event", {}), "host": doc.get("host", {}), "observed": doc.get("observed"), "scan": doc.get("scan"), "ports": doc.get("ports", []), "source": doc.get("source"), } ) return events def derive_network_label(ip: str) -> str: if not ip: return "unknown" if ":" in ip: parts = ip.split(":") prefix = ":".join(parts[:4]) return f"{prefix}::/64" octets = ip.split(".") if len(octets) == 4: return f"{octets[0]}.{octets[1]}.{octets[2]}.0/24" return "unknown" def build_network_map(hosts: List[Dict]): networks: Dict[str, Dict] = {} for host in hosts: seen = set() for ip in host.get("ips", []): label = derive_network_label(ip) if label in seen: continue seen.add(label) entry = networks.setdefault(label, {"cidr": label, "hosts": []}) entry["hosts"].append( { "id": host.get("id"), "name": host.get("name"), "ips": host.get("ips", []), "sources": host.get("sources", []), "last_seen": host.get("last_seen"), } ) sorted_networks = sorted(networks.values(), key=lambda n: n["cidr"]) for entry in sorted_networks: entry["hosts"].sort(key=lambda h: h.get("name") or h.get("id") or "") return sorted_networks def bool_arg(value: Optional[str], default: bool = False) -> bool: if value is None: return default return value.lower() in {"1", "true", "yes", "on"} def build_manifest(base_url: str) -> Dict: base = base_url.rstrip("/") tools = [] for tool in REST_TOOLS: tools.append( { "name": tool["name"], "description": tool["description"], "method": tool["method"], "path": tool["path"], "url": f"{base}{tool['path']}", } ) return { "name": "network-mcp", "description": "Network discovery source-of-truth backed by Elasticsearch, Nmap, and OPNsense.", "schema": "1.0", "tools": tools, "auth": "env", } def tool_result(summary: str, data: Dict[str, Any]): return summary, data def handle_tool_list_hosts(arguments: Dict[str, Any]): limit = clamp_int(arguments.get("limit"), HOST_SEARCH_LIMIT, 1, 5000) raw_terms = coerce_string_list(arguments.get("terms")) search_terms = parse_search_terms(raw_terms) hosts = fetch_hosts(limit=limit, source=arguments.get("source"), search_terms=search_terms or None) return tool_result(f"Returned {len(hosts)} hosts.", {"hosts": hosts, "total": len(hosts)}) def handle_tool_network_map(arguments: Dict[str, Any]): limit = clamp_int(arguments.get("limit"), HOST_SEARCH_LIMIT, 1, 5000) hosts = fetch_hosts(limit=limit) network_map = build_network_map(hosts) return tool_result(f"Computed {len(network_map)} networks.", {"networks": network_map, "host_count": len(hosts)}) def handle_tool_get_host(arguments: Dict[str, Any]): host_id = arguments.get("host_id") if not host_id: raise ValueError("host_id is required") host = fetch_host_by_id(host_id) if not host: raise KeyError(f"Host {host_id} not found") include_events = coerce_bool(arguments.get("include_events"), default=False) result = {"host": host} if include_events: events_limit = clamp_int(arguments.get("events_limit"), DEFAULT_EVENT_LIMIT, 1, 1000) result["events"] = fetch_events(host_id=host_id, limit=events_limit) return tool_result(f"Fetched host {host_id}.", result) def handle_tool_list_events(arguments: Dict[str, Any]): limit = clamp_int(arguments.get("limit"), DEFAULT_EVENT_LIMIT, 1, 1000) events = fetch_events( host_id=arguments.get("host_id"), limit=limit, event_type=arguments.get("type"), since=arguments.get("since"), ) return tool_result(f"Returned {len(events)} events.", {"events": events, "total": len(events)}) def handle_tool_host_events(arguments: Dict[str, Any]): host_id = arguments.get("host_id") if not host_id: raise ValueError("host_id is required") limit = clamp_int(arguments.get("limit"), DEFAULT_EVENT_LIMIT, 1, 1000) events = fetch_events(host_id=host_id, limit=limit, event_type=arguments.get("type"), since=arguments.get("since")) return tool_result(f"Returned {len(events)} events for {host_id}.", {"events": events, "total": len(events)}) TOOL_HANDLERS = { "list_hosts": handle_tool_list_hosts, "network_map": handle_tool_network_map, "get_host": handle_tool_get_host, "list_events": handle_tool_list_events, "host_events": handle_tool_host_events, } def list_mcp_tools(): tools = [] for name, meta in MCP_TOOL_DEFINITIONS.items(): tool = { "name": name, "description": meta.get("description"), "inputSchema": meta.get("inputSchema", {"type": "object"}), } title = meta.get("title") if title: tool["title"] = title output_schema = meta.get("outputSchema") if output_schema: tool["outputSchema"] = output_schema annotations = meta.get("annotations") if annotations: tool["annotations"] = annotations tools.append(tool) return tools def call_tool_by_name(name: str, arguments: Optional[Dict[str, Any]] = None): if name not in TOOL_HANDLERS: raise KeyError(f"Unknown tool: {name}") handler = TOOL_HANDLERS[name] summary, data = handler(arguments or {}) return summary, data def list_mcp_resources(base_uri: str = "network://"): return [ { "uri": f"{base_uri}hosts", "name": "hosts", "title": "Hosts (Snapshot)", "mimeType": "application/json", "description": "Snapshot of merged hosts (inventory + opnsense + nmap). Use resources/templates/list for search parameters.", }, { "uri": f"{base_uri}map", "name": "map", "title": "Network Map (Snapshot)", "mimeType": "application/json", "description": "Snapshot of networks grouped by /24 (IPv4) or /64 (IPv6).", }, { "uri": f"{base_uri}events", "name": "events", "title": "Recent Events (Snapshot)", "mimeType": "application/json", "description": "Recent scan/discovery events. Use resources/templates/list for filters (host_id/type/since).", }, ] def list_mcp_resource_templates(base_uri: str = "network://"): return [ { "uriTemplate": f"{base_uri}hosts{{?q,source,limit}}", "name": "hosts_query", "title": "Hosts Query", "mimeType": "application/json", "description": "Query hosts by q (hostname/IP/MAC/name, case-insensitive), source, and limit. Repeat q to provide multiple terms.", }, { "uriTemplate": f"{base_uri}host/{{host_id}}{{?include_events,events_limit}}", "name": "host_detail", "title": "Host Detail", "mimeType": "application/json", "description": "Fetch a single host by host_id (e.g. mac:aa:bb.. or ip:192.168.5.10). Optionally include events.", }, { "uriTemplate": f"{base_uri}events{{?host_id,type,since,limit}}", "name": "events_query", "title": "Events Query", "mimeType": "application/json", "description": "Query recent events with optional filters host_id, type, since (ISO8601), and limit.", }, { "uriTemplate": f"{base_uri}map{{?limit}}", "name": "map_query", "title": "Network Map", "mimeType": "application/json", "description": "Build a network map from up to limit hosts.", }, ] def read_mcp_resource(uri: str): parsed = urlparse(uri) if parsed.scheme != "network": raise ValueError(f"Unsupported resource URI scheme: {parsed.scheme}") netloc = parsed.netloc query = parse_qs(parsed.query or "") if netloc == "hosts": limit = clamp_int((query.get("limit") or [HOST_SEARCH_LIMIT])[0], HOST_SEARCH_LIMIT, 1, 5000) source = (query.get("source") or [None])[0] q_terms = query.get("q") or [] search_terms = parse_search_terms(q_terms) payload = {"hosts": fetch_hosts(limit=limit, source=source, search_terms=search_terms or None)} payload["total"] = len(payload["hosts"]) return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]} if netloc == "map": limit = clamp_int((query.get("limit") or [HOST_SEARCH_LIMIT])[0], HOST_SEARCH_LIMIT, 1, 5000) hosts = fetch_hosts(limit=limit) payload = {"networks": build_network_map(hosts), "host_count": len(hosts)} return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]} if netloc == "events": limit = clamp_int((query.get("limit") or [DEFAULT_EVENT_LIMIT])[0], DEFAULT_EVENT_LIMIT, 1, 1000) host_id = (query.get("host_id") or [None])[0] event_type = (query.get("type") or [None])[0] since = (query.get("since") or [None])[0] events = fetch_events(host_id=host_id, limit=limit, event_type=event_type, since=since) payload = {"events": events, "total": len(events)} return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]} if netloc == "host": host_id = unquote((parsed.path or "").lstrip("/")) if not host_id: raise ValueError("Host resource requires / path") include_events = coerce_bool((query.get("include_events") or [False])[0], default=False) events_limit = clamp_int((query.get("events_limit") or [DEFAULT_EVENT_LIMIT])[0], DEFAULT_EVENT_LIMIT, 1, 1000) host = fetch_host_by_id(host_id) if not host: raise KeyError(f"Host {host_id} not found") payload = {"host": host} if include_events: payload["events"] = fetch_events(host_id=host_id, limit=events_limit) return {"contents": [{"uri": uri, "mimeType": "application/json", "text": json.dumps(payload)}]} raise ValueError(f"Unknown resource URI: {uri}") def jsonrpc_error(rpc_id: Any, code: int, message: str): return { "jsonrpc": "2.0", "id": rpc_id, "error": {"code": code, "message": message}, } def build_initialize_result(protocol_version: Optional[str] = None): protocol_version = protocol_version or "2025-11-25" return { "protocolVersion": protocol_version, "capabilities": { "tools": {"listChanged": False}, "resources": {"listChanged": False, "subscribe": False}, }, "serverInfo": {"name": "network-mcp", "version": SERVER_VERSION}, "instructions": "Start with list_hosts (search by hostname/IP/MAC), then use get_host for details and list_events/host_events for timelines; network_map gives a quick /24-/64 overview.", } def process_rpc_request(payload: Dict[str, Any]): if not isinstance(payload, dict): return jsonrpc_error(None, -32600, "Invalid request") rpc_id = payload.get("id") method = payload.get("method") params = payload.get("params") or {} is_notification = rpc_id is None if method == "initialize": requested = params.get("protocolVersion") requested_str = str(requested) if requested is not None else None return {"jsonrpc": "2.0", "id": rpc_id, "result": build_initialize_result(requested_str)} if method == "ping": return {"jsonrpc": "2.0", "id": rpc_id, "result": {}} if method == "tools/list": result = {"tools": list_mcp_tools(), "nextCursor": None} return {"jsonrpc": "2.0", "id": rpc_id, "result": result} if method == "resources/list": result = {"resources": list_mcp_resources(), "nextCursor": None} return {"jsonrpc": "2.0", "id": rpc_id, "result": result} if method == "resources/templates/list": result = {"resourceTemplates": list_mcp_resource_templates(), "nextCursor": None} return {"jsonrpc": "2.0", "id": rpc_id, "result": result} if method == "resources/read": uri = (params or {}).get("uri") if not uri: return jsonrpc_error(rpc_id, -32602, "uri is required") try: result = read_mcp_resource(uri) return {"jsonrpc": "2.0", "id": rpc_id, "result": result} except ValueError as exc: return jsonrpc_error(rpc_id, -32602, str(exc)) except KeyError as exc: message = exc.args[0] if exc.args else str(exc) return jsonrpc_error(rpc_id, -32004, message) if method == "notifications/initialized": # No response for notifications. return None if method == "tools/call": name = params.get("name") if not name: if is_notification: return None return jsonrpc_error(rpc_id, -32602, "Tool name is required") arguments = params.get("arguments") or {} try: summary, data = call_tool_by_name(name, arguments) result = { "content": [{"type": "text", "text": summary}], "structuredContent": data, "isError": False, } if is_notification: return None return {"jsonrpc": "2.0", "id": rpc_id, "result": result} except ValueError as exc: if is_notification: return None result = { "content": [{"type": "text", "text": f"Tool argument error: {exc}"}], "structuredContent": {"error": str(exc)}, "isError": True, } return {"jsonrpc": "2.0", "id": rpc_id, "result": result} except KeyError as exc: message = exc.args[0] if exc.args else str(exc) if is_notification: return None result = { "content": [{"type": "text", "text": f"Tool error: {message}"}], "structuredContent": {"error": message}, "isError": True, } return {"jsonrpc": "2.0", "id": rpc_id, "result": result} except Exception as exc: # pragma: no cover - defensive if is_notification: return None return jsonrpc_error(rpc_id, -32603, f"Internal error: {exc}") if is_notification: return None return jsonrpc_error(rpc_id, -32601, f"Method {method} not found") def process_rpc_envelope(payload: Any): if isinstance(payload, list): responses = [] for entry in payload: response = process_rpc_request(entry) if response is not None: responses.append(response) return responses if isinstance(payload, dict): return process_rpc_request(payload) return jsonrpc_error(None, -32600, "Invalid request") @app.route("/api/hosts") def api_hosts(): limit = min(int(request.args.get("limit", HOST_SEARCH_LIMIT)), 5000) q_args = request.args.getlist("q") search_terms = parse_search_terms(q_args) hosts = fetch_hosts( limit=limit, source=request.args.get("source"), search_terms=search_terms if search_terms else None, ) return jsonify({"hosts": hosts, "total": len(hosts)}) @app.route("/api/hosts/") def api_host_detail(host_id: str): host = fetch_host_by_id(host_id) if not host: abort(404, description=f"Host {host_id} not found") include_events = bool_arg(request.args.get("include_events"), default=False) result = {"host": host} if include_events: limit = min(int(request.args.get("events_limit", DEFAULT_EVENT_LIMIT)), 1000) result["events"] = fetch_events(host_id=host_id, limit=limit) return jsonify(result) @app.route("/api/events") def api_events(): limit = min(int(request.args.get("limit", DEFAULT_EVENT_LIMIT)), 1000) events = fetch_events( host_id=request.args.get("host_id"), limit=limit, event_type=request.args.get("type"), since=request.args.get("since"), ) return jsonify({"events": events, "total": len(events)}) @app.route("/api/hosts//events") def api_host_events(host_id: str): limit = min(int(request.args.get("limit", DEFAULT_EVENT_LIMIT)), 1000) events = fetch_events(host_id=host_id, limit=limit, event_type=request.args.get("type"), since=request.args.get("since")) return jsonify({"events": events, "total": len(events)}) @app.route("/api/map") def api_map(): limit = min(int(request.args.get("limit", HOST_SEARCH_LIMIT)), 5000) hosts = fetch_hosts(limit=limit) network_map = build_network_map(hosts) return jsonify({"networks": network_map, "host_count": len(hosts)}) @app.route("/.well-known/mcp.json", methods=["GET", "POST", "OPTIONS"]) @app.route("/api/mcp", methods=["GET", "POST", "OPTIONS"]) def api_manifest(): if request.method == "OPTIONS": return ("", 204, {"Allow": "GET,POST,OPTIONS"}) if request.method == "POST": payload = request.get_json(silent=True) if payload is None: return jsonify(jsonrpc_error(None, -32700, "Invalid JSON")), 400 rpc_response = process_rpc_envelope(payload) if rpc_response is None or (isinstance(rpc_response, list) and not rpc_response): return ("", 204) return jsonify(rpc_response) manifest = build_manifest(request.url_root.rstrip("/")) return jsonify(manifest) @app.route("/") def index(): hosts = fetch_hosts() total = len(hosts) with_ports = sum(1 for h in hosts if h["ports"]) inventory_hosts = sum(1 for h in hosts if "inventory" in h["sources"]) return render_template( "index.html", hosts=hosts, total=total, with_ports=with_ports, inventory_hosts=inventory_hosts, es_url=ES_URL, ) if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.getenv("FRONTEND_PORT", "5001")))