import os import requests import json import ipaddress from .logging_config import setup_logging logger = setup_logging("opnsense_client") class OPNsenseClient: def __init__(self): self.base_url = os.getenv("OPNSENSE_URL", "https://192.168.1.1").rstrip('/') self.api_key = os.getenv("OPNSENSE_API_KEY") self.api_secret = os.getenv("OPNSENSE_API_SECRET") self.verify_ssl = os.getenv("ES_VERIFY_SSL", "true").lower() == "true" # Reusing verify flag or add explicit OPNSENSE_VERIFY_SSL if not self.api_key or not self.api_secret: logger.warning("OPNSENSE_API_KEY or OPNSENSE_API_SECRET not set. API calls will fail.") def _get(self, endpoint, params=None): url = f"{self.base_url}{endpoint}" try: response = requests.get( url, auth=(self.api_key, self.api_secret), verify=self.verify_ssl, params=params, timeout=10 ) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Failed to fetch {url}: {e}") return {} def get_dhcp_leases_v4(self): # Endpoint: /api/dhcpv4/leases/search # Note: 'search' endpoints in OPNsense often expect POST or GET with params for filtering. # Often a simple GET works for 'searchLeases' or similar. # Standard OPNsense API for leases might be under /api/dhcpv4/leases/searchLeases # Let's try the standard search endpoint. data = self._get("/api/dhcpv4/leases/searchLease") # API return structure usually: {"rows": [...], "total": ...} return data.get("rows", []) def get_arp_table(self): # Endpoint: /api/diagnostics/arp/search # This endpoint returns the ARP table. data = self._get("/api/diagnostics/interface/getArp") # Structure varies, let's assume standard response list or rows # If the standard plugin is used, it might be /api/diagnostics/interface/getArp # Or /api/diagnostics/network/arp ... # NOTE: OPNsense API paths can be tricky. /api/diagnostics/interface/getArp is a common one. # It returns a list directly or a dict with rows. # Let's assume list of dicts or {"rows": []} if isinstance(data, list): return data return data.get("rows", []) def get_dns_overrides(self): # Endpoint: /api/unbound/settings/searchHostOverride data = self._get("/api/unbound/settings/searchHostOverride") return data.get("rows", []) def get_vlan_networks(self): """ Build a list of IPv4 networks (CIDRs) from the routing table, grouped by interface description. """ routes = self._get("/api/diagnostics/interface/getRoutes") networks = [] if not isinstance(routes, list): return networks seen = set() for route in routes: if route.get("proto") != "ipv4": continue destination = route.get("destination") if not destination or "/" not in destination or destination == "default": continue desc = route.get("intf_description") if not desc: continue try: network = ipaddress.ip_network(destination, strict=False) except ValueError: continue # Skip host routes (/32) which are usually static peers if network.prefixlen == 32: continue if network.prefixlen < 16: continue key = (desc, str(network)) if key in seen: continue seen.add(key) networks.append({ "key": desc, "name": desc, "cidr": str(network) }) return networks def get_opnsense_client(): return OPNsenseClient()