Source code for driftmux.scanners.nmap

from __future__ import annotations

import shlex
import subprocess
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field

from driftmux.models import HostScanResult
from driftmux.parsers.nmap import NmapXmlParser


[docs] @dataclass(slots=True) class NmapScanner: timeout: int = 600 ports: str | None = None nmap_script: str | None = None extra_args: list[str] = field(default_factory=list) debug: bool = False
[docs] def scan(self, host: str) -> HostScanResult: if self.ports: return self._run_version_scan(host, self.ports) return self._run_version_discovery_scan(host)
def _run_version_discovery_scan(self, host: str) -> HostScanResult: command = [ "nmap", "-sV", "-Pn", "--open", ] if self.nmap_script: command.extend(["--script", self.nmap_script]) if self.extra_args: command.extend(self.extra_args) command.extend(["-oX", "-", host]) return self._run_and_parse(command, host, mode="single_phase_version_discovery", ports="default") def _run_version_scan(self, host: str, ports: str) -> HostScanResult: command = [ "nmap", "-sV", "-Pn", "-p", ports, ] if self.nmap_script: command.extend(["--script", self.nmap_script]) if self.extra_args: command.extend(self.extra_args) command.extend(["-oX", "-", host]) return self._run_and_parse(command, host, mode="direct_version", ports=ports) def _run_and_parse( self, command: list[str], host: str, *, mode: str, ports: str, ) -> HostScanResult: proc = self._run_command(command) result = HostScanResult(host=host) if proc.returncode not in (0, 1): result.add_error( "nmap", f"Nmap failed with exit code {proc.returncode}", proc.stderr.strip() or proc.stdout.strip(), ) return result try: parsed = NmapXmlParser.parse(proc.stdout, host) except ET.ParseError as exc: result.add_error("nmap", "Failed to parse Nmap XML", str(exc)) return result parsed.metadata.setdefault("nmap", {}).update( { "mode": mode, "ports": ports, } ) if self.debug: print("[DEBUG] parsed services:", len(parsed.services)) for service in parsed.services: print( "[DEBUG] parsed service:", service.port, service.protocol, service.state, service.service, service.product, service.version, service.cpes, ) return parsed def _run_command(self, command: list[str]) -> subprocess.CompletedProcess[str]: if self.debug: print("[DEBUG] Nmap command:", shlex.join(command)) proc = subprocess.run( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=self.timeout, check=False, ) if self.debug: print("[DEBUG] Nmap return code:", proc.returncode) print("[DEBUG] Nmap stderr:", proc.stderr[:1000]) print("[DEBUG] Nmap stdout preview:", proc.stdout[:4000]) return proc