diff --git a/cyberai/agents/exploit/agent.py b/cyberai/agents/exploit/agent.py index 007160b..e16b90b 100644 --- a/cyberai/agents/exploit/agent.py +++ b/cyberai/agents/exploit/agent.py @@ -19,6 +19,8 @@ from .attack_path import AttackPath, build_attack_paths from .chain_builder import build_exploit_chain +from .nuclei_engine import NucleiEngine +from .poc_mapper import lookup_poc from cyberai.core.types import ( AttackPath as AttackPathModel, ExploitChain as ExploitChainModel, @@ -100,6 +102,10 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str else: chain = self.call_tool("build_chain", cves=ranked_cves[:3], target=target) + nuclei_results: List[Dict[str, Any]] = [] + if getattr(self.config, "use_nuclei", False): + nuclei_results = self._run_nuclei(target, ranked_cves) + analysis = self._ai_analysis(target, ranked_cves, attack_paths, chain) self._print_attack_table(attack_paths) @@ -107,6 +113,7 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str "attack_paths": [p.to_dict() for p in attack_paths], "exploit_chain": chain, "ai_analysis": analysis, + "nuclei": nuclei_results, } # Build a validated ExploitResult and store it in the KB. exploit_result = ExploitResult( @@ -183,6 +190,58 @@ def _exec_native_tool(self, tc: Any, ranked_cves: List[Dict], target: str) -> An return build_exploit_chain(ranked_cves[:3], tgt) return {"error": f"unknown tool: {tc.name}"} + # OOB-trigger heuristics: techniques that confirm via out-of-band callback. + _OOB_MARKERS = ("jndi", "ssrf", "oob", "callback", "log4", "rce") + + def _cve_needs_oob(self, cve_id: str) -> bool: + """Decide if a CVE template should receive a phantom-grid OOB host. + + Uses the internal poc_mapper technique text + severity as a heuristic. + """ + info = lookup_poc(cve_id) + blob = " ".join(str(info.get(k, "")) for k in ("technique", "name", "poc_hint")).lower() + if any(m in blob for m in self._OOB_MARKERS): + return True + return str(info.get("severity", "")).upper() == "CRITICAL" + + def _run_nuclei(self, target: str, ranked_cves: List[Dict]) -> List[Dict[str, Any]]: + """Flag-gated: run nuclei per CVE, wiring phantom-grid OOB when needed. + + For CVEs whose technique implies an out-of-band callback, the + phantom-grid capture URL is injected via nuclei -var so templates that + read a custom OOB host hit our collector instead of the public one. + Returns [] when nuclei is unavailable. + """ + engine = NucleiEngine() + if not engine.available: + self._log("nuclei unavailable — skipping template engine") + return [] + + # Mint one OOB host for any CVE that needs it. + oob_host = "" + grid = PhantomGridClient() + if grid.available and any( + self._cve_needs_oob(c.get("cve_id", "")) for c in ranked_cves[:5] + ): + token = grid.create_token(label=f"nuclei-{target}") + if token: + oob_host = grid.capture_url(token) + + results: List[Dict[str, Any]] = [] + for cve in ranked_cves[:5]: + self._check_iteration_limit() + cve_id = cve.get("cve_id", "") + if not cve_id: + continue + if oob_host and self._cve_needs_oob(cve_id): + engine.extra_vars = {"oob": oob_host} + else: + engine.extra_vars = {} + findings = engine.run(target, cve_id=cve_id) + results.extend(f.to_dict() for f in findings) + self._log("nuclei run complete", {"findings": len(results)}) + return results + def _ai_analysis( self, target: str, diff --git a/cyberai/agents/exploit/nuclei_engine.py b/cyberai/agents/exploit/nuclei_engine.py new file mode 100644 index 0000000..9f20d47 --- /dev/null +++ b/cyberai/agents/exploit/nuclei_engine.py @@ -0,0 +1,170 @@ +"""Nuclei exploit engine — CVE/tag-driven template execution (day 23). + +Wraps the `nuclei` binary (ProjectDiscovery) via subprocess, runs templates by +CVE id or tags, and parses JSONL output into structured findings. Degrades +gracefully when the binary is absent (available=False, empty results) so CI and +non-nuclei environments never break. +""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("cyberai.exploit.nuclei") + +# Fallback locations checked when nuclei is not on PATH. +_FALLBACK_PATHS = [ + os.path.expanduser("~/go/bin/nuclei"), + "/usr/local/bin/nuclei", + "/usr/bin/nuclei", +] + +DEFAULT_TIMEOUT = 300 # nuclei runs can be slow; 5 min ceiling + + +def find_nuclei() -> Optional[str]: + """Locate the nuclei binary: PATH, then known fallback dirs, then env.""" + env = os.getenv("NUCLEI_PATH") + if env and os.path.exists(env): + return env + found = shutil.which("nuclei") + if found: + return found + for p in _FALLBACK_PATHS: + if os.path.exists(p): + return p + return None + + +@dataclass +class NucleiFinding: + """One parsed nuclei JSONL result line.""" + + template_id: str + name: str + severity: str + host: str + matched_at: str + type: str = "" + cve_id: Optional[str] = None + raw: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "template_id": self.template_id, + "name": self.name, + "severity": self.severity, + "host": self.host, + "matched_at": self.matched_at, + "type": self.type, + "cve_id": self.cve_id, + } + + +def _parse_line(line: str) -> Optional[NucleiFinding]: + """Parse one JSONL line from nuclei into a NucleiFinding.""" + line = line.strip() + if not line: + return None + try: + d = json.loads(line) + except json.JSONDecodeError: + return None + info = d.get("info", {}) or {} + classification = info.get("classification") or {} + cve = classification.get("cve-id") + # cve-id can be a list, a string, or null + if isinstance(cve, list): + cve = cve[0] if cve else None + return NucleiFinding( + template_id=d.get("template-id", ""), + name=info.get("name", ""), + severity=info.get("severity", "unknown"), + host=d.get("host", ""), + matched_at=d.get("matched-at", d.get("url", "")), + type=d.get("type", ""), + cve_id=cve, + raw=d, + ) + + +def parse_jsonl(output: str) -> List[NucleiFinding]: + """Parse multi-line nuclei JSONL output into findings.""" + findings = [] + for line in output.splitlines(): + f = _parse_line(line) + if f is not None: + findings.append(f) + return findings + + +class NucleiEngine: + """Runs nuclei templates against a target by CVE id or tags.""" + + def __init__( + self, + nuclei_path: Optional[str] = None, + timeout: int = DEFAULT_TIMEOUT, + extra_vars: Optional[Dict[str, str]] = None, + ): + self.nuclei_path = nuclei_path or find_nuclei() + self.timeout = timeout + # Template variables injected via -var (e.g. OOB callback host). + self.extra_vars = extra_vars or {} + + @property + def available(self) -> bool: + return bool(self.nuclei_path and os.path.exists(self.nuclei_path)) + + def _base_cmd(self, target: str) -> List[str]: + cmd = [ + self.nuclei_path or "nuclei", + "-target", + target, + "-jsonl", + "-silent", + "-no-color", + "-omit-raw", # drop request/response — keeps JSON compact + ] + for k, v in self.extra_vars.items(): + cmd += ["-var", f"{k}={v}"] + return cmd + + def run( + self, + target: str, + cve_id: Optional[str] = None, + tags: Optional[List[str]] = None, + ) -> List[NucleiFinding]: + """Run nuclei against target, scoped by CVE id or tags. + + Returns [] (and logs) when the binary is unavailable or the run fails. + """ + if not self.available: + logger.warning("nuclei binary not found — skipping template run") + return [] + cmd = self._base_cmd(target) + if cve_id: + cmd += ["-id", cve_id] + if tags: + cmd += ["-tags", ",".join(tags)] + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout, + ) + except subprocess.TimeoutExpired: + logger.warning("nuclei timed out after %ss", self.timeout) + return [] + except Exception as exc: # noqa: BLE001 — never hard-fail the pipeline + logger.warning("nuclei execution failed: %s", exc) + return [] + return parse_jsonl(proc.stdout) diff --git a/cyberai/agents/exploit/searchsploit.py b/cyberai/agents/exploit/searchsploit.py new file mode 100644 index 0000000..68a725d --- /dev/null +++ b/cyberai/agents/exploit/searchsploit.py @@ -0,0 +1,133 @@ +"""searchsploit integration — Exploit-DB PoC lookup by CVE/term (day 23). + +Wraps the `searchsploit` binary (Exploit-DB). Parses `-j` JSON output into PoC +records. Degrades gracefully when the binary is absent (available=False, empty +results) so environments without exploitdb installed never break. + +JSON shape (searchsploit -j ): + {"SEARCH": "...", "RESULTS_EXPLOIT": [{"Title","EDB-ID","Path",...}], + "RESULTS_SHELLCODE": [...]} +""" + +from __future__ import annotations + +import json +import logging +import os +import shutil +import subprocess +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("cyberai.exploit.searchsploit") + +_FALLBACK_PATHS = [ + "/usr/bin/searchsploit", + "/opt/exploitdb/searchsploit", + "/usr/local/bin/searchsploit", +] + +DEFAULT_TIMEOUT = 30 + + +def find_searchsploit() -> Optional[str]: + """Locate the searchsploit binary: env, PATH, then known fallback dirs.""" + env = os.getenv("SEARCHSPLOIT_PATH") + if env and os.path.exists(env): + return env + found = shutil.which("searchsploit") + if found: + return found + for p in _FALLBACK_PATHS: + if os.path.exists(p): + return p + return None + + +@dataclass +class ExploitRecord: + """One Exploit-DB PoC entry.""" + + title: str + edb_id: str + path: str + type: str = "" + platform: str = "" + date: str = "" + raw: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return { + "title": self.title, + "edb_id": self.edb_id, + "path": self.path, + "type": self.type, + "platform": self.platform, + "date": self.date, + } + + +def _parse_record(raw: Dict[str, Any]) -> ExploitRecord: + return ExploitRecord( + title=raw.get("Title", ""), + edb_id=str(raw.get("EDB-ID", "")), + path=raw.get("Path", ""), + type=raw.get("Type", ""), + platform=raw.get("Platform", ""), + date=raw.get("Date_Published", raw.get("Date", "")), + raw=raw, + ) + + +def parse_output(output: str) -> List[ExploitRecord]: + """Parse `searchsploit -j` JSON into ExploitRecords (exploits only).""" + output = output.strip() + if not output: + return [] + try: + data = json.loads(output) + except json.JSONDecodeError: + return [] + results = data.get("RESULTS_EXPLOIT", []) or [] + return [_parse_record(r) for r in results] + + +class SearchSploit: + """Queries Exploit-DB via the searchsploit binary.""" + + def __init__( + self, + searchsploit_path: Optional[str] = None, + timeout: int = DEFAULT_TIMEOUT, + ): + self.searchsploit_path = searchsploit_path or find_searchsploit() + self.timeout = timeout + + @property + def available(self) -> bool: + return bool(self.searchsploit_path and os.path.exists(self.searchsploit_path)) + + def search(self, term: str) -> List[ExploitRecord]: + """Search Exploit-DB for a CVE id or keyword. [] if unavailable.""" + if not self.available: + logger.warning("searchsploit not found — skipping PoC lookup") + return [] + cmd = [self.searchsploit_path or "searchsploit", "-j", term] + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=self.timeout, + ) + except subprocess.TimeoutExpired: + logger.warning("searchsploit timed out after %ss", self.timeout) + return [] + except Exception as exc: # noqa: BLE001 — never hard-fail + logger.warning("searchsploit execution failed: %s", exc) + return [] + return parse_output(proc.stdout) + + def search_cve(self, cve_id: str) -> List[ExploitRecord]: + """Convenience: search by CVE id.""" + return self.search(cve_id) diff --git a/cyberai/core/config.py b/cyberai/core/config.py index 581afd5..b11149c 100644 --- a/cyberai/core/config.py +++ b/cyberai/core/config.py @@ -41,6 +41,8 @@ class CyberAIConfig: max_agent_iterations: int = 10 # Hard budget for total LLM spend in this scan, USD. 0.0 disables the check. max_cost_usd: float = 0.0 + # Flag-gated: run the nuclei template engine in ExploitAgent (day 23). + use_nuclei: bool = False @classmethod def from_file(cls, path: str) -> "CyberAIConfig": diff --git a/tests/unit/test_nuclei.py b/tests/unit/test_nuclei.py new file mode 100644 index 0000000..50bed49 --- /dev/null +++ b/tests/unit/test_nuclei.py @@ -0,0 +1,244 @@ +"""Day 23 — nuclei engine + searchsploit: parsing, subprocess mock, OOB wiring.""" + +from __future__ import annotations + +import json +import subprocess +from unittest.mock import MagicMock, patch + +from cyberai.agents.exploit.nuclei_engine import ( + NucleiEngine, + NucleiFinding, + find_nuclei, + parse_jsonl, +) +from cyberai.agents.exploit.searchsploit import ( + ExploitRecord, + SearchSploit, + parse_output, +) + + +# ── nuclei JSONL parser ─────────────────────────────────────────────── + +# Real v3.8.0 line shapes captured from a live run. +_WAF_LINE = json.dumps( + { + "template-id": "waf-detect", + "info": { + "name": "WAF Detection", + "severity": "info", + "classification": {"cve-id": None, "cwe-id": ["cwe-200"]}, + }, + "type": "http", + "host": "scanme.nmap.org", + "matched-at": "http://scanme.nmap.org", + } +) +_CVE_LINE = json.dumps( + { + "template-id": "CVE-2021-44228", + "info": { + "name": "Log4Shell", + "severity": "critical", + "classification": {"cve-id": ["CVE-2021-44228"]}, + }, + "type": "http", + "host": "victim.local", + "matched-at": "http://victim.local/api", + } +) + + +def test_parse_single_line(): + fs = parse_jsonl(_WAF_LINE) + assert len(fs) == 1 + f = fs[0] + assert f.template_id == "waf-detect" + assert f.severity == "info" + assert f.cve_id is None # null handled + + +def test_parse_cve_id_list_takes_first(): + f = parse_jsonl(_CVE_LINE)[0] + assert f.cve_id == "CVE-2021-44228" + assert f.severity == "critical" + + +def test_parse_multiline_and_garbage(): + blob = _WAF_LINE + "\n\ngarbage{\n" + _CVE_LINE + fs = parse_jsonl(blob) + assert len(fs) == 2 # garbage line skipped + + +def test_parse_empty(): + assert parse_jsonl("") == [] + + +# ── NucleiEngine.run (subprocess mocked) ────────────────────────────── + + +@patch("cyberai.agents.exploit.nuclei_engine.os.path.exists", return_value=True) +@patch("cyberai.agents.exploit.nuclei_engine.subprocess.run") +def test_run_parses_stdout(mock_run, _exists): + mock_run.return_value = MagicMock(stdout=_CVE_LINE, returncode=0) + eng = NucleiEngine(nuclei_path="/fake/nuclei") + findings = eng.run("victim.local", cve_id="CVE-2021-44228") + assert len(findings) == 1 + assert findings[0].cve_id == "CVE-2021-44228" + # command carries -id and the jsonl/omit-raw flags + cmd = mock_run.call_args.args[0] + assert "-id" in cmd and "CVE-2021-44228" in cmd + assert "-jsonl" in cmd and "-omit-raw" in cmd + + +@patch("cyberai.agents.exploit.nuclei_engine.os.path.exists", return_value=True) +@patch("cyberai.agents.exploit.nuclei_engine.subprocess.run") +def test_run_tags(mock_run, _exists): + mock_run.return_value = MagicMock(stdout="", returncode=0) + eng = NucleiEngine(nuclei_path="/fake/nuclei") + eng.run("victim.local", tags=["cve", "rce"]) + cmd = mock_run.call_args.args[0] + assert "-tags" in cmd + assert "cve,rce" in cmd + + +@patch("cyberai.agents.exploit.nuclei_engine.os.path.exists", return_value=True) +@patch("cyberai.agents.exploit.nuclei_engine.subprocess.run") +def test_run_injects_oob_var(mock_run, _exists): + mock_run.return_value = MagicMock(stdout="", returncode=0) + eng = NucleiEngine(nuclei_path="/fake/nuclei") + eng.extra_vars = {"oob": "http://grid.local/c/tok1"} + eng.run("victim.local", cve_id="CVE-2021-44228") + cmd = mock_run.call_args.args[0] + assert "-var" in cmd + assert "oob=http://grid.local/c/tok1" in cmd + + +@patch("cyberai.agents.exploit.nuclei_engine.os.path.exists", return_value=True) +@patch("cyberai.agents.exploit.nuclei_engine.subprocess.run") +def test_run_timeout_returns_empty(mock_run, _exists): + mock_run.side_effect = subprocess.TimeoutExpired(cmd="nuclei", timeout=1) + eng = NucleiEngine(nuclei_path="/fake/nuclei") + assert eng.run("victim.local", cve_id="CVE-X") == [] + + +@patch("cyberai.agents.exploit.nuclei_engine.find_nuclei", return_value=None) +def test_run_unavailable_returns_empty(_find): + eng = NucleiEngine() + assert eng.available is False + assert eng.run("victim.local", cve_id="CVE-X") == [] + + +def test_find_nuclei_env(monkeypatch, tmp_path): + fake = tmp_path / "nuclei" + fake.write_text("#!/bin/sh\n") + monkeypatch.setenv("NUCLEI_PATH", str(fake)) + assert find_nuclei() == str(fake) + + +# ── searchsploit parser + graceful ──────────────────────────────────── + +_SS_JSON = json.dumps( + { + "SEARCH": "CVE-2021-44228", + "RESULTS_EXPLOIT": [ + { + "Title": "Apache Log4j 2 - RCE (Log4Shell)", + "EDB-ID": "50592", + "Path": "/opt/exploitdb/exploits/java/remote/50592.py", + "Type": "remote", + "Platform": "java", + "Date_Published": "2021-12-14", + } + ], + "RESULTS_SHELLCODE": [], + } +) + + +def test_searchsploit_parse(): + recs = parse_output(_SS_JSON) + assert len(recs) == 1 + assert recs[0].edb_id == "50592" + assert recs[0].path.endswith("50592.py") + assert recs[0].platform == "java" + + +def test_searchsploit_parse_empty_and_garbage(): + assert parse_output("") == [] + assert parse_output("not json") == [] + assert parse_output('{"SEARCH":"x","RESULTS_EXPLOIT":[]}') == [] + + +@patch("cyberai.agents.exploit.searchsploit.os.path.exists", return_value=True) +@patch("cyberai.agents.exploit.searchsploit.subprocess.run") +def test_searchsploit_search(mock_run, _exists): + mock_run.return_value = MagicMock(stdout=_SS_JSON, returncode=0) + ss = SearchSploit(searchsploit_path="/fake/searchsploit") + recs = ss.search_cve("CVE-2021-44228") + assert len(recs) == 1 + assert isinstance(recs[0], ExploitRecord) + cmd = mock_run.call_args.args[0] + assert "-j" in cmd and "CVE-2021-44228" in cmd + + +@patch("cyberai.agents.exploit.searchsploit.find_searchsploit", return_value=None) +def test_searchsploit_unavailable(_find): + ss = SearchSploit() + assert ss.available is False + assert ss.search("anything") == [] + + +# ── ExploitAgent OOB heuristic + _run_nuclei wiring ─────────────────── + + +def _agent(): + from cyberai.agents.exploit.agent import ExploitAgent + + agent = ExploitAgent.__new__(ExploitAgent) + agent.AGENT_NAME = "exploit" + agent._log = MagicMock() + agent._iterations = 0 + agent.config = MagicMock() + agent.config.max_agent_iterations = 10 + return agent + + +def test_cve_needs_oob_jndi(): + agent = _agent() + # Log4Shell is in the internal poc_mapper with JNDI technique. + assert agent._cve_needs_oob("CVE-2021-44228") is True + + +def test_run_nuclei_unavailable_engine(monkeypatch): + import cyberai.agents.exploit.agent as ag + + monkeypatch.setattr(ag, "NucleiEngine", lambda *a, **k: MagicMock(available=False)) + agent = _agent() + assert agent._run_nuclei("victim.local", [{"cve_id": "CVE-2021-44228"}]) == [] + + +def test_run_nuclei_collects_findings(monkeypatch): + import cyberai.agents.exploit.agent as ag + + fake_engine = MagicMock() + fake_engine.available = True + fake_engine.extra_vars = {} + fake_engine.run.return_value = [ + NucleiFinding( + template_id="CVE-2021-44228", + name="Log4Shell", + severity="critical", + host="victim.local", + matched_at="http://victim.local", + cve_id="CVE-2021-44228", + ) + ] + monkeypatch.setattr(ag, "NucleiEngine", lambda *a, **k: fake_engine) + # grid unavailable -> no oob host, but findings still collected + monkeypatch.setattr(ag, "PhantomGridClient", lambda *a, **k: MagicMock(available=False)) + agent = _agent() + res = agent._run_nuclei("victim.local", [{"cve_id": "CVE-2021-44228"}]) + assert len(res) == 1 + assert res[0]["cve_id"] == "CVE-2021-44228"