Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions cyberai/agents/exploit/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from .attack_path import AttackPath, build_attack_paths
from .chain_builder import build_exploit_chain
from .nuclei_engine import NucleiEngine
from .poc_mapper import lookup_poc
from cyberai.core.types import (
AttackPath as AttackPathModel,
ExploitChain as ExploitChainModel,
Expand Down Expand Up @@ -100,13 +102,18 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
else:
chain = self.call_tool("build_chain", cves=ranked_cves[:3], target=target)

nuclei_results: List[Dict[str, Any]] = []
if getattr(self.config, "use_nuclei", False):
nuclei_results = self._run_nuclei(target, ranked_cves)

analysis = self._ai_analysis(target, ranked_cves, attack_paths, chain)

self._print_attack_table(attack_paths)
result = {
"attack_paths": [p.to_dict() for p in attack_paths],
"exploit_chain": chain,
"ai_analysis": analysis,
"nuclei": nuclei_results,
}
# Build a validated ExploitResult and store it in the KB.
exploit_result = ExploitResult(
Expand Down Expand Up @@ -183,6 +190,58 @@ def _exec_native_tool(self, tc: Any, ranked_cves: List[Dict], target: str) -> An
return build_exploit_chain(ranked_cves[:3], tgt)
return {"error": f"unknown tool: {tc.name}"}

# OOB-trigger heuristics: techniques that confirm via out-of-band callback.
_OOB_MARKERS = ("jndi", "ssrf", "oob", "callback", "log4", "rce")

def _cve_needs_oob(self, cve_id: str) -> bool:
"""Decide if a CVE template should receive a phantom-grid OOB host.

Uses the internal poc_mapper technique text + severity as a heuristic.
"""
info = lookup_poc(cve_id)
blob = " ".join(str(info.get(k, "")) for k in ("technique", "name", "poc_hint")).lower()
if any(m in blob for m in self._OOB_MARKERS):
return True
return str(info.get("severity", "")).upper() == "CRITICAL"

def _run_nuclei(self, target: str, ranked_cves: List[Dict]) -> List[Dict[str, Any]]:
"""Flag-gated: run nuclei per CVE, wiring phantom-grid OOB when needed.

For CVEs whose technique implies an out-of-band callback, the
phantom-grid capture URL is injected via nuclei -var so templates that
read a custom OOB host hit our collector instead of the public one.
Returns [] when nuclei is unavailable.
"""
engine = NucleiEngine()
if not engine.available:
self._log("nuclei unavailable — skipping template engine")
return []

# Mint one OOB host for any CVE that needs it.
oob_host = ""
grid = PhantomGridClient()
if grid.available and any(
self._cve_needs_oob(c.get("cve_id", "")) for c in ranked_cves[:5]
):
token = grid.create_token(label=f"nuclei-{target}")
if token:
oob_host = grid.capture_url(token)

results: List[Dict[str, Any]] = []
for cve in ranked_cves[:5]:
self._check_iteration_limit()
cve_id = cve.get("cve_id", "")
if not cve_id:
continue
if oob_host and self._cve_needs_oob(cve_id):
engine.extra_vars = {"oob": oob_host}
else:
engine.extra_vars = {}
findings = engine.run(target, cve_id=cve_id)
results.extend(f.to_dict() for f in findings)
self._log("nuclei run complete", {"findings": len(results)})
return results

def _ai_analysis(
self,
target: str,
Expand Down
170 changes: 170 additions & 0 deletions cyberai/agents/exploit/nuclei_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""Nuclei exploit engine — CVE/tag-driven template execution (day 23).

Wraps the `nuclei` binary (ProjectDiscovery) via subprocess, runs templates by
CVE id or tags, and parses JSONL output into structured findings. Degrades
gracefully when the binary is absent (available=False, empty results) so CI and
non-nuclei environments never break.
"""

from __future__ import annotations

import json
import logging
import os
import shutil
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

logger = logging.getLogger("cyberai.exploit.nuclei")

# Fallback locations checked when nuclei is not on PATH.
_FALLBACK_PATHS = [
os.path.expanduser("~/go/bin/nuclei"),
"/usr/local/bin/nuclei",
"/usr/bin/nuclei",
]

DEFAULT_TIMEOUT = 300 # nuclei runs can be slow; 5 min ceiling


def find_nuclei() -> Optional[str]:
"""Locate the nuclei binary: PATH, then known fallback dirs, then env."""
env = os.getenv("NUCLEI_PATH")
if env and os.path.exists(env):
return env
found = shutil.which("nuclei")
if found:
return found
for p in _FALLBACK_PATHS:
if os.path.exists(p):
return p
return None


@dataclass
class NucleiFinding:
"""One parsed nuclei JSONL result line."""

template_id: str
name: str
severity: str
host: str
matched_at: str
type: str = ""
cve_id: Optional[str] = None
raw: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return {
"template_id": self.template_id,
"name": self.name,
"severity": self.severity,
"host": self.host,
"matched_at": self.matched_at,
"type": self.type,
"cve_id": self.cve_id,
}


def _parse_line(line: str) -> Optional[NucleiFinding]:
"""Parse one JSONL line from nuclei into a NucleiFinding."""
line = line.strip()
if not line:
return None
try:
d = json.loads(line)
except json.JSONDecodeError:
return None
info = d.get("info", {}) or {}
classification = info.get("classification") or {}
cve = classification.get("cve-id")
# cve-id can be a list, a string, or null
if isinstance(cve, list):
cve = cve[0] if cve else None
return NucleiFinding(
template_id=d.get("template-id", ""),
name=info.get("name", ""),
severity=info.get("severity", "unknown"),
host=d.get("host", ""),
matched_at=d.get("matched-at", d.get("url", "")),
type=d.get("type", ""),
cve_id=cve,
raw=d,
)


def parse_jsonl(output: str) -> List[NucleiFinding]:
"""Parse multi-line nuclei JSONL output into findings."""
findings = []
for line in output.splitlines():
f = _parse_line(line)
if f is not None:
findings.append(f)
return findings


class NucleiEngine:
"""Runs nuclei templates against a target by CVE id or tags."""

def __init__(
self,
nuclei_path: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT,
extra_vars: Optional[Dict[str, str]] = None,
):
self.nuclei_path = nuclei_path or find_nuclei()
self.timeout = timeout
# Template variables injected via -var (e.g. OOB callback host).
self.extra_vars = extra_vars or {}

@property
def available(self) -> bool:
return bool(self.nuclei_path and os.path.exists(self.nuclei_path))

def _base_cmd(self, target: str) -> List[str]:
cmd = [
self.nuclei_path or "nuclei",
"-target",
target,
"-jsonl",
"-silent",
"-no-color",
"-omit-raw", # drop request/response — keeps JSON compact
]
for k, v in self.extra_vars.items():
cmd += ["-var", f"{k}={v}"]
return cmd

def run(
self,
target: str,
cve_id: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> List[NucleiFinding]:
"""Run nuclei against target, scoped by CVE id or tags.

Returns [] (and logs) when the binary is unavailable or the run fails.
"""
if not self.available:
logger.warning("nuclei binary not found — skipping template run")
return []
cmd = self._base_cmd(target)
if cve_id:
cmd += ["-id", cve_id]
if tags:
cmd += ["-tags", ",".join(tags)]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
)
except subprocess.TimeoutExpired:
logger.warning("nuclei timed out after %ss", self.timeout)
return []
except Exception as exc: # noqa: BLE001 — never hard-fail the pipeline
logger.warning("nuclei execution failed: %s", exc)
return []
return parse_jsonl(proc.stdout)
133 changes: 133 additions & 0 deletions cyberai/agents/exploit/searchsploit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""searchsploit integration — Exploit-DB PoC lookup by CVE/term (day 23).

Wraps the `searchsploit` binary (Exploit-DB). Parses `-j` JSON output into PoC
records. Degrades gracefully when the binary is absent (available=False, empty
results) so environments without exploitdb installed never break.

JSON shape (searchsploit -j <term>):
{"SEARCH": "...", "RESULTS_EXPLOIT": [{"Title","EDB-ID","Path",...}],
"RESULTS_SHELLCODE": [...]}
"""

from __future__ import annotations

import json
import logging
import os
import shutil
import subprocess
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

logger = logging.getLogger("cyberai.exploit.searchsploit")

_FALLBACK_PATHS = [
"/usr/bin/searchsploit",
"/opt/exploitdb/searchsploit",
"/usr/local/bin/searchsploit",
]

DEFAULT_TIMEOUT = 30


def find_searchsploit() -> Optional[str]:
"""Locate the searchsploit binary: env, PATH, then known fallback dirs."""
env = os.getenv("SEARCHSPLOIT_PATH")
if env and os.path.exists(env):
return env
found = shutil.which("searchsploit")
if found:
return found
for p in _FALLBACK_PATHS:
if os.path.exists(p):
return p
return None


@dataclass
class ExploitRecord:
"""One Exploit-DB PoC entry."""

title: str
edb_id: str
path: str
type: str = ""
platform: str = ""
date: str = ""
raw: Dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> Dict[str, Any]:
return {
"title": self.title,
"edb_id": self.edb_id,
"path": self.path,
"type": self.type,
"platform": self.platform,
"date": self.date,
}


def _parse_record(raw: Dict[str, Any]) -> ExploitRecord:
return ExploitRecord(
title=raw.get("Title", ""),
edb_id=str(raw.get("EDB-ID", "")),
path=raw.get("Path", ""),
type=raw.get("Type", ""),
platform=raw.get("Platform", ""),
date=raw.get("Date_Published", raw.get("Date", "")),
raw=raw,
)


def parse_output(output: str) -> List[ExploitRecord]:
"""Parse `searchsploit -j` JSON into ExploitRecords (exploits only)."""
output = output.strip()
if not output:
return []
try:
data = json.loads(output)
except json.JSONDecodeError:
return []
results = data.get("RESULTS_EXPLOIT", []) or []
return [_parse_record(r) for r in results]


class SearchSploit:
"""Queries Exploit-DB via the searchsploit binary."""

def __init__(
self,
searchsploit_path: Optional[str] = None,
timeout: int = DEFAULT_TIMEOUT,
):
self.searchsploit_path = searchsploit_path or find_searchsploit()
self.timeout = timeout

@property
def available(self) -> bool:
return bool(self.searchsploit_path and os.path.exists(self.searchsploit_path))

def search(self, term: str) -> List[ExploitRecord]:
"""Search Exploit-DB for a CVE id or keyword. [] if unavailable."""
if not self.available:
logger.warning("searchsploit not found — skipping PoC lookup")
return []
cmd = [self.searchsploit_path or "searchsploit", "-j", term]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout,
)
except subprocess.TimeoutExpired:
logger.warning("searchsploit timed out after %ss", self.timeout)
return []
except Exception as exc: # noqa: BLE001 — never hard-fail
logger.warning("searchsploit execution failed: %s", exc)
return []
return parse_output(proc.stdout)

def search_cve(self, cve_id: str) -> List[ExploitRecord]:
"""Convenience: search by CVE id."""
return self.search(cve_id)
Loading
Loading