Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added cyberai/agents/web3/__init__.py
Empty file.
99 changes: 99 additions & 0 deletions cyberai/agents/web3/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""SmartContractAgent — Solidity static analysis & severity triage (day 24).

Standalone agent (not in the recon→intel→exploit→report network pipeline):
takes a contract address or local .sol path, runs static analysis, and triages
findings against Immunefi severity. Etherscan fetch is optional; local source
is the primary path.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, Optional

from rich.console import Console

from cyberai.core.base_agent import BaseAgent, Tool

from .etherscan import EtherscanClient
from .immunefi_severity import classify_all, highest_tier
from .slither_tool import SlitherTool

console = Console()


class SmartContractAgent(BaseAgent):
"""Web3 agent — static analysis of Solidity contracts."""

AGENT_NAME = "web3"
ROLE = "Smart Contract Auditor"

def _register_tools(self) -> None:
self.register_tool(
Tool(
name="fetch_source",
description="Fetch verified contract source from Etherscan",
func=self._fetch_source,
parameters={"address": "str"},
)
)
self.register_tool(
Tool(
name="slither_scan",
description="Static-analyze a Solidity file with slither",
func=self._slither_scan,
parameters={"path": "str"},
)
)

def _fetch_source(self, address: str) -> Dict[str, Any]:
client = EtherscanClient()
src = client.get_source(address)
if src is None:
return {"verified": False, "source_code": ""}
return {
"address": src.address,
"name": src.name,
"verified": src.verified,
"compiler_version": src.compiler_version,
"source_len": len(src.source_code),
}

def _slither_scan(self, path: str) -> Dict[str, Any]:
tool = SlitherTool()
findings = tool.analyze(path)
classified = classify_all(findings)
return {
"available": tool.available,
"findings": classified,
"highest_severity": highest_tier(findings),
"count": len(classified),
}

def run(self, target: str, context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Analyze a contract.

`target` is either a local .sol path or a contract address. Slither
wiring + severity arrive in later commits; this skeleton resolves the
source and records intent.
"""
self._log(f"Smart-contract analysis target: {target}")
is_local = Path(target).exists() and target.endswith(".sol")
result: Dict[str, Any] = {
"target": target,
"mode": "local" if is_local else "address",
"findings": [],
}
if is_local:
scan = self.call_tool("slither_scan", path=target)
result["findings"] = scan["findings"]
result["highest_severity"] = scan["highest_severity"]
result["slither_available"] = scan["available"]
else:
result["source_meta"] = self.call_tool("fetch_source", address=target)
self.kb.set("web3", result, agent=self.AGENT_NAME)
self._log(
"Smart-contract analysis complete",
{"mode": result["mode"], "findings": len(result["findings"])},
)
return result
86 changes: 86 additions & 0 deletions cyberai/agents/web3/etherscan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Etherscan API client — fetch verified source, ABI, status (day 24).

Degrades gracefully when no API key is set (available=False), so the agent
can still analyze local .sol files without an internet/Etherscan dependency.
"""

from __future__ import annotations

import logging
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

import httpx

logger = logging.getLogger("cyberai.web3.etherscan")

DEFAULT_API = "https://api.etherscan.io/api"


@dataclass
class ContractSource:
"""Verified contract metadata from Etherscan."""

address: str
name: str = ""
source_code: str = ""
abi: str = ""
compiler_version: str = ""
verified: bool = False
raw: Dict[str, Any] = field(default_factory=dict)


class EtherscanClient:
"""Minimal Etherscan client for source/ABI retrieval."""

def __init__(
self,
api_key: Optional[str] = None,
base_url: str = DEFAULT_API,
timeout: int = 15,
):
self.api_key = api_key or os.getenv("ETHERSCAN_API_KEY", "")
self.base_url = base_url
self.timeout = timeout

@property
def available(self) -> bool:
return bool(self.api_key)

def get_source(self, address: str) -> Optional[ContractSource]:
"""Fetch verified source for a contract address. None if unavailable."""
if not self.available:
logger.warning("no ETHERSCAN_API_KEY — skipping remote source fetch")
return None
try:
with httpx.Client(timeout=self.timeout) as client:
r = client.get(
self.base_url,
params={
"module": "contract",
"action": "getsourcecode",
"address": address,
"apikey": self.api_key,
},
)
r.raise_for_status()
body = r.json()
except Exception as exc: # noqa: BLE001 — never hard-fail
logger.warning("etherscan fetch failed: %s", exc)
return None

results: List[Dict[str, Any]] = body.get("result", []) or []
if not results or not isinstance(results, list):
return None
item = results[0]
source = item.get("SourceCode", "") or ""
return ContractSource(
address=address,
name=item.get("ContractName", ""),
source_code=source,
abi=item.get("ABI", ""),
compiler_version=item.get("CompilerVersion", ""),
verified=bool(source) and item.get("ABI") != "Contract source code not verified",
raw=item,
)
108 changes: 108 additions & 0 deletions cyberai/agents/web3/immunefi_severity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Immunefi severity classification for slither findings (day 24).

Maps slither detector checks to Immunefi's severity tiers
(Critical / High / Medium / Low / Insight) following their bug-bounty
severity methodology for smart contracts. A per-check table gives precise
classification; an impact/confidence fallback covers unknown detectors.

Immunefi smart-contract impact reference (paraphrased):
Critical — direct theft/loss/freezing of funds, contract takeover.
High — theft of unclaimed yield, temporary freezing, griefing with cost.
Medium — contract fails to deliver promised value (no fund loss).
Low — minor/contained issues, best-practice deviations.
Insight — informational, no security impact.
"""

from __future__ import annotations

from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
from .slither_tool import SlitherFinding

# Immunefi tiers, ordered high→low for ranking.
IMMUNEFI_TIERS = ["Critical", "High", "Medium", "Low", "Insight"]
_TIER_RANK = {t: i for i, t in enumerate(IMMUNEFI_TIERS)}

# Precise per-check mapping for high-signal slither detectors.
CHECK_TO_IMMUNEFI = {
# direct fund loss / takeover -> Critical
"reentrancy-eth": "Critical",
"arbitrary-send-eth": "Critical",
"arbitrary-send-erc20": "Critical",
"suicidal": "Critical",
"controlled-delegatecall": "Critical",
"unprotected-upgrade": "Critical",
"delegatecall-loop": "Critical",
# exploitable but conditional / no direct theft -> High
"reentrancy-no-eth": "High",
"tx-origin": "High",
"weak-prng": "High",
"incorrect-equality": "High",
"unchecked-transfer": "High",
"controlled-array-length": "High",
# logic/contained -> Medium
"uninitialized-state": "Medium",
"uninitialized-storage": "Medium",
"divide-before-multiply": "Medium",
"reentrancy-benign": "Medium",
"timestamp": "Medium",
"unchecked-lowlevel": "Medium",
"unchecked-send": "Medium",
# best-practice / contained -> Low
"low-level-calls": "Low",
"missing-zero-check": "Low",
"calls-loop": "Low",
"reentrancy-events": "Low",
# informational -> Insight
"solc-version": "Insight",
"pragma": "Insight",
"naming-convention": "Insight",
"dead-code": "Insight",
"assembly": "Insight",
"external-function": "Insight",
}

# Fallback: slither impact + confidence -> Immunefi tier.
_IMPACT_FALLBACK = {
("High", "High"): "Critical",
("High", "Medium"): "High",
("High", "Low"): "High",
("Medium", "High"): "High",
("Medium", "Medium"): "Medium",
("Medium", "Low"): "Medium",
("Low", "High"): "Low",
("Low", "Medium"): "Low",
("Low", "Low"): "Low",
("Informational", "High"): "Insight",
("Informational", "Medium"): "Insight",
("Informational", "Low"): "Insight",
}


def classify(finding: "SlitherFinding") -> str:
"""Return the Immunefi tier for a single slither finding."""
if finding.check in CHECK_TO_IMMUNEFI:
return CHECK_TO_IMMUNEFI[finding.check]
return _IMPACT_FALLBACK.get((finding.impact, finding.confidence), "Insight")


def classify_all(findings: List["SlitherFinding"]) -> List[dict]:
"""Classify findings, attaching an `immunefi_severity` field, sorted high→low."""
rows = []
for f in findings:
d = f.to_dict()
d["immunefi_severity"] = classify(f)
rows.append(d)
rows.sort(key=lambda r: _TIER_RANK.get(r["immunefi_severity"], 99))
return rows


def highest_tier(findings: List["SlitherFinding"]) -> str:
"""Return the most severe Immunefi tier across findings (Insight if none)."""
if not findings:
return "Insight"
return min(
(classify(f) for f in findings),
key=lambda t: _TIER_RANK.get(t, 99),
)
Loading
Loading