import subprocess import xml.etree.ElementTree as ET import shutil from typing import List, Dict, Optional from .logging_config import setup_logging logger = setup_logging("nmap_parser") def run_nmap_scan(ips: List[str], extra_args: Optional[List[str]] = None) -> List[Dict]: """ Run nmap on the given IPs and return a list of parsed host dicts. """ if not ips: return [] if not shutil.which("nmap"): logger.error("nmap binary not found in PATH") return [] # Default args: -oX - (XML to stdout) cmd = ["nmap", "-oX", "-"] if extra_args: cmd.extend(extra_args) # Append IPs cmd.extend(ips) logger.info(f"Running nmap command: {' '.join(cmd)}") try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) xml_output = result.stdout return parse_nmap_xml(xml_output) except subprocess.CalledProcessError as e: logger.error(f"Nmap failed: {e.stderr}") return [] except Exception as e: logger.error(f"Error running nmap: {e}") return [] def parse_nmap_xml(xml_string: str) -> List[Dict]: """ Parse Nmap XML output into our internal host/port structure. """ try: root = ET.fromstring(xml_string) except ET.ParseError as e: logger.error(f"Failed to parse Nmap XML: {e}") return [] hosts = [] for host_node in root.findall("host"): # Helper to find basic info ip = None mac = None hostname = None vendor = None # Addresses for addr in host_node.findall("address"): addr_type = addr.get("addrtype") if addr_type == "ipv4": ip = addr.get("addr") elif addr_type == "mac": mac = addr.get("addr") vendor = addr.get("vendor") # Hostnames hostnames_node = host_node.find("hostnames") if hostnames_node is not None: # Pick first for now hn = hostnames_node.find("hostname") if hn is not None: hostname = hn.get("name") # Ports ports = [] ports_node = host_node.find("ports") if ports_node is not None: for port_node in ports_node.findall("port"): state_node = port_node.find("state") state = state_node.get("state") if state_node is not None else "unknown" # Only care about open ports usually, but keep all for now if needed if state != "open": continue port_id = int(port_node.get("portid")) protocol = port_node.get("protocol") service_node = port_node.find("service") service_name = service_node.get("name") if service_node is not None else "unknown" product = service_node.get("product") if service_node is not None else None version = service_node.get("version") if service_node is not None else None service_def = { "name": service_name, } if product: service_def["product"] = product if version: service_def["version"] = version ports.append({ "port": port_id, "proto": protocol, "state": state, "service": service_def }) # OS detection (basic) os_match = None os_node = host_node.find("os") if os_node is not None: os_match_node = os_node.find("osmatch") if os_match_node is not None: os_match = { "name": os_match_node.get("name"), "accuracy": os_match_node.get("accuracy") } host_data = { "ip": ip, "mac": mac, # might be None if scanning remote segment "hostname": hostname, "vendor": vendor, "ports": ports, "os_match": os_match } hosts.append(host_data) return hosts