Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cyberai/agents/report/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from pathlib import Path
from typing import Any, Dict, Optional

import json

from cyberai.core.base_agent import BaseAgent, Tool
from cyberai.core.types import ReportSection

from .json_exporter import export_json
from .markdown_renderer import render_markdown
Expand Down Expand Up @@ -58,9 +61,59 @@ def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str
self.kb.set("report.markdown_path", md_path, agent=self.AGENT_NAME)
self.kb.set("report.json_path", json_path, agent=self.AGENT_NAME)

# Flag-gated: LLM-generated structured executive section.
if getattr(self.config, "use_llm_summary", False) and self.llm is not None:
section = self._structured_summary(target)
if section is not None:
self.kb.set("report.section", section.model_dump(), agent=self.AGENT_NAME)

return {
"status": "done",
"markdown": md_path,
"json": json_path,
"total_findings": len(self.session.findings),
}

def _structured_summary(self, target: str):
"""Flag-gated: ask the LLM for a Pydantic-validated ReportSection.

Uses LLMClient.structured_call with ReportSection's JSON Schema; the
provider returns JSON, which we validate. Returns None on any failure
so the deterministic report is never blocked.
"""
if self.llm is None:
return None
findings = [
{
"title": f.title,
"severity": getattr(f.severity, "value", str(f.severity)),
"description": f.description,
}
for f in self.session.findings
]
system = (
"You are a penetration-test report writer. Summarize the findings "
"into one executive ReportSection: a concise title, the highest "
"applicable severity, key findings, concrete recommendations, and "
"a short business impact statement."
)
messages = [
{
"role": "user",
"content": (f"Target: {target}\nFindings JSON:\n{json.dumps(findings, indent=2)}"),
}
]
schema = ReportSection.model_json_schema()
try:
raw = self.llm.structured_call(
messages,
schema=schema,
schema_name="report_section",
description="Executive pentest report section.",
system=system,
agent_name=self.AGENT_NAME,
)
return ReportSection.model_validate(raw)
except Exception as exc: # noqa: BLE001 — report must never hard-fail
self._log(f"LLM structured summary failed: {exc}")
return None
42 changes: 42 additions & 0 deletions cyberai/agents/report/h1_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""HackerOne-compatible Markdown export for a ReportSection (day 20)."""

from __future__ import annotations

from cyberai.core.types import ReportSection

# Map internal severity to HackerOne's severity vocabulary.
_H1_SEVERITY = {
"CRITICAL": "Critical",
"HIGH": "High",
"MEDIUM": "Medium",
"LOW": "Low",
"INFO": "None",
}


def _bullets(items: list[str]) -> str:
"""Render a list as Markdown bullets; placeholder if empty."""
if not items:
return "_None provided._"
return "\n".join(f"- {it}" for it in items)


def export_hackerone(section: ReportSection) -> str:
"""Render a ReportSection as a HackerOne-style Markdown submission.

Sections follow the H1 report template: Title, Severity, Steps to
Reproduce, Impact, Recommendation. `findings` map to reproduction
steps; `recommendations` to the Recommendation block.
"""
severity = _H1_SEVERITY.get(section.severity.upper(), "None")
impact = section.impact.strip() or "_Impact not specified._"
return (
f"# {section.title}\n\n"
f"**Severity:** {severity}\n\n"
f"## Steps to Reproduce\n\n"
f"{_bullets(section.findings)}\n\n"
f"## Impact\n\n"
f"{impact}\n\n"
f"## Recommendation\n\n"
f"{_bullets(section.recommendations)}\n"
)
109 changes: 109 additions & 0 deletions cyberai/core/llm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,115 @@ def _call_tools_anthropic(
stop_reason=getattr(response, "stop_reason", None),
)

# ── structured output (sync) ──────────────────────────────────────

def structured_call(
self,
messages: List[Dict],
schema: Dict[str, Any],
schema_name: str = "response",
description: str = "",
system: Optional[str] = None,
agent_name: str = "unknown",
cacheable_system: bool = False,
) -> Dict[str, Any]:
"""Force the model to return JSON matching `schema`; returns parsed dict.

OpenAI: response_format=json_schema. Anthropic: a single forced tool
whose input_schema is `schema` — the tool_use input IS the structured
output. Ollama is unsupported. Caller validates via pydantic.
"""
if self.config.provider == "openai":
return self._structured_openai(
messages, schema, schema_name, description, system, agent_name
)
elif self.config.provider == "anthropic":
return self._structured_anthropic(
messages,
schema,
schema_name,
description,
system,
agent_name,
cacheable_system,
)
else:
raise ValueError(f"Structured output unsupported for provider: {self.config.provider}")

def _structured_openai(
self, messages, schema, schema_name, description, system, agent_name="unknown"
):
import openai

client = openai.OpenAI(api_key=self.config.api_key)
full_messages = []
if system:
full_messages.append({"role": "system", "content": system})
full_messages.extend(messages)
response = client.chat.completions.create(
model=self.config.model,
messages=full_messages,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
response_format={
"type": "json_schema",
"json_schema": {
"name": schema_name,
"schema": schema,
"strict": False,
},
},
)
self._record_usage(
agent_name,
getattr(response, "model", self.config.model),
getattr(response.usage, "prompt_tokens", 0),
getattr(response.usage, "completion_tokens", 0),
)
content = response.choices[0].message.content or "{}"
return json.loads(content)

def _structured_anthropic(
self,
messages,
schema,
schema_name,
description,
system,
agent_name="unknown",
cacheable_system=False,
):
import anthropic

client = anthropic.Anthropic(api_key=self.config.api_key)
tool = {
"name": schema_name,
"description": description or f"Return a structured {schema_name}.",
"input_schema": schema,
}
kwargs: Dict[str, Any] = dict(
model=self.config.model,
max_tokens=self.config.max_tokens,
messages=messages,
tools=[tool],
tool_choice={"type": "tool", "name": schema_name},
)
if system:
kwargs["system"] = _wrap_cacheable(system) if cacheable_system else system
response = client.messages.create(**kwargs)
self._record_usage(
agent_name,
getattr(response, "model", self.config.model),
getattr(response.usage, "input_tokens", 0),
getattr(response.usage, "output_tokens", 0),
cache_creation_tokens=getattr(response.usage, "cache_creation_input_tokens", 0) or 0,
cache_read_tokens=getattr(response.usage, "cache_read_input_tokens", 0) or 0,
)
for block in response.content:
if block.type == "tool_use":
return dict(block.input)
return {}

# ── async API ─────────────────────────────────────────────────────

async def acall(
Expand Down
25 changes: 24 additions & 1 deletion cyberai/core/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Union
from pathlib import Path

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator

# Target types
Target = str # IP, CIDR, or domain
Expand Down Expand Up @@ -105,6 +105,29 @@ class ExploitResult(BaseModel):
ReportPath = Path
ReportFormat = str # "markdown" | "html" | "json" | "pdf"

_VALID_SEVERITIES = {"CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"}


class ReportSection(BaseModel):
"""LLM-generated structured report section (day 20 structured outputs).

`impact` is included for HackerOne-style export; not in the original
plan column but required by the H1 template.
"""

title: str
severity: str = "INFO"
findings: list[str] = Field(default_factory=list)
recommendations: list[str] = Field(default_factory=list)
impact: str = ""

@field_validator("severity")
@classmethod
def _norm_severity(cls, v: str) -> str:
up = (v or "INFO").strip().upper()
return up if up in _VALID_SEVERITIES else "INFO"


# Pipeline
PipelineInput = Target
PipelineOutput = dict[str, AgentOutput]
Expand Down
Loading
Loading