Source code for driftmux.engine

from __future__ import annotations

from dataclasses import dataclass
from typing import Iterable

from driftmux.planner import build_scan_plan
from driftmux.models import HostScanResult
from driftmux.scanners.nmap import NmapScanner
from driftmux.scanners.nuclei import NucleiScanner
from driftmux.scanners.nvd import NvdConfig, NvdCveScanner
from driftmux.scanners.plecost import PlecostScanner


WEB_PORTS = {
    80,
    81,
    443,
    444,
    591,
    593,
    8000,
    8008,
    8080,
    8081,
    8088,
    8443,
    8888,
    9443,
}


[docs] def is_web_candidate(service) -> bool: svc = (getattr(service, "service", "") or "").lower() product = (getattr(service, "product", "") or "").lower() port = int(getattr(service, "port", 0) or 0) if "http" in svc or "https" in svc: return True if any(x in product for x in ("apache", "nginx", "tomcat", "jetty", "nextcloud")): return True if port in WEB_PORTS: return True return False
[docs] def guess_scheme(service, configured_scheme: str) -> str: if configured_scheme in {"http", "https"}: return configured_scheme port = int(getattr(service, "port", 0) or 0) if port in {443, 8443, 9443}: return "https" if port in {80, 81, 8000, 8008, 8080, 8081, 8088, 8888}: return "http" return "auto"
[docs] @dataclass(slots=True) class ScanConfig: ports: str | None = None nmap_script: str | None = None timeout: int = 120 web_scheme: str = "auto" nuclei_profile: str = "fast" output_format: str = "json" output_dir: str = "reports" log_dir: str = "logs" deep_wordpress: bool = False # NVD backend vuln_backend: str = "none" min_cvss: float = 0.0 nvd_api_key: str | None = None nvd_cache: str = "~/.cache/driftmux/nvd.sqlite" nvd_cache_ttl_hours: int = 168
[docs] class DriftmuxEngine: def __init__(self, config: ScanConfig): self.config = config self.nmap = NmapScanner( timeout=config.timeout, ports=config.ports, nmap_script=config.nmap_script, ) self.nvd = NvdCveScanner( NvdConfig( enabled=config.vuln_backend == "nvd", api_key=config.nvd_api_key, min_cvss=config.min_cvss, cache_path=config.nvd_cache, cache_ttl_hours=config.nvd_cache_ttl_hours, timeout=max(20, min(config.timeout, 60)), ) ) self.nuclei = NucleiScanner( timeout=max(config.timeout, 180), profile=getattr(config, "nuclei_profile", "fast"), ) self.plecost = PlecostScanner( timeout=max(config.timeout, 180), mode=config.web_scheme, deep=config.deep_wordpress, )
[docs] def scan_host(self, host: str) -> HostScanResult: discovery = self.nmap.scan(host) final = HostScanResult( host=host, services=discovery.services.copy(), findings=discovery.findings.copy(), errors=discovery.errors.copy(), metadata=discovery.metadata.copy(), ) final.metadata["config"] = { "ports": self.config.ports, "nmap_script": self.config.nmap_script, "timeout": self.config.timeout, "web_scheme": self.config.web_scheme, "nuclei_profile": self.config.nuclei_profile, "vuln_backend": self.config.vuln_backend, "min_cvss": self.config.min_cvss, "deep_wordpress": self.config.deep_wordpress, } if self.config.vuln_backend == "nvd": nvd_result = self.nvd.scan(host, discovery.services) final.findings.extend(nvd_result.findings) final.errors.extend(nvd_result.errors) final.metadata.update(nvd_result.metadata) if self.config.nuclei_profile != "passive": self._run_nuclei(host, discovery, final) self._run_plecost(host, discovery, final) return final
def _run_nuclei( self, host: str, discovery: HostScanResult, final: HostScanResult, ) -> None: plan = build_scan_plan( host=host, services=discovery.services, passive_findings=final.findings, scheme=self.config.web_scheme, profile=self.config.nuclei_profile, ) nuclei_result = self.nuclei.scan_many( host=host, targets=plan.nuclei_targets, ) final.findings.extend(nuclei_result.findings) final.errors.extend(nuclei_result.errors) def _run_plecost( self, host: str, discovery: HostScanResult, final: HostScanResult, ) -> None: wp_done = False for service in discovery.services: if not wp_done and self.plecost.maybe_wordpress(host, service): plecost_result = self.plecost.scan(host, service) final.findings.extend(plecost_result.findings) final.errors.extend(plecost_result.errors) final.metadata.update(plecost_result.metadata) wp_done = True if not wp_done and self.plecost.maybe_wordpress(host): plecost_result = self.plecost.scan(host) final.findings.extend(plecost_result.findings) final.errors.extend(plecost_result.errors) final.metadata.update(plecost_result.metadata)
[docs] def scan_hosts(self, hosts: Iterable[str]) -> list[HostScanResult]: return [self.scan_host(host) for host in hosts]