diff --git a/README.md b/README.md index 1828c46..9d3a03a 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,6 @@ broker order buy AAPL 100 --limit 185 \ - **Autonomous Execution** — Persistent auth keeps sessions alive 24/7. No manual logins, no token expiry interruptions. - **Multi-Broker** — Unified commands across E\*Trade and Interactive Brokers. One skill file, one interface. - **Full Options Support** — Option chains with greeks, expiry filtering, and strike ranges. -- **Risk Guardrails** — Exposure analysis, cancel-all for instant flattening, paper trading mode. Power with built-in safety valves. ## Supported Brokers @@ -88,11 +87,6 @@ broker positions Current positions broker pnl P&L summary broker balance Account balances and margin broker exposure --by symbol Exposure breakdown -broker risk check Pre-trade risk validation -broker risk limits Current risk limits -broker risk set PARAM VALUE Update a risk limit -broker risk halt Emergency halt -broker risk resume Resume after halt broker audit orders Order audit trail broker audit commands Command audit trail ``` diff --git a/cli/README.md b/cli/README.md index d450c0a..1ff423d 100644 --- a/cli/README.md +++ b/cli/README.md @@ -17,7 +17,6 @@ broker positions broker order buy AAPL 5 --limit 180 --tif DAY --idempotency-key strat-42-aapl broker order buy AAPL 5 --limit 180 --dry-run broker chain AAPL --type call --strike-range 0.95:1.05 --limit 100 --fields strike,expiry,bid,ask -broker limits broker audit commands --request-id broker schema quote.snapshot ``` diff --git a/cli/src/_common.py b/cli/src/_common.py index fbf6cad..7e20308 100644 --- a/cli/src/_common.py +++ b/cli/src/_common.py @@ -274,6 +274,5 @@ def _default_suggestion(code: ErrorCode) -> str | None: ErrorCode.IB_DISCONNECTED: "Verify IB Gateway/TWS is running and the gateway host/port are correct.", ErrorCode.INVALID_ARGS: "Run `broker --help` or ` --help` for valid usage.", ErrorCode.TIMEOUT: "Retry the command or increase `runtime.request_timeout_seconds` in config.", - ErrorCode.RISK_HALTED: "Review `broker limits`, then run `broker resume` when appropriate.", } return suggestions.get(code) diff --git a/cli/src/audit.py b/cli/src/audit.py index 1047756..289f6b4 100644 --- a/cli/src/audit.py +++ b/cli/src/audit.py @@ -21,7 +21,6 @@ class AuditSource(str, Enum): class AuditTable(str, Enum): ORDERS = "orders" COMMANDS = "commands" - RISK = "risk" class ExportFormat(str, Enum): @@ -92,30 +91,6 @@ def commands( handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) -@app.command("risk", help="Query risk event audit records.") -def risk( - ctx: typer.Context, - event_type: str | None = typer.Option(None, "--type"), -) -> None: - state = get_state(ctx) - command = "audit.risk" - params: dict[str, object] = {} - if event_type: - params["type"] = event_type - - try: - result = run_async(daemon_request(state, command, params)) - print_output( - result.data.get("risk_events", []), - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - @app.command("export", help="Export audit rows to CSV.") def export( ctx: typer.Context, @@ -126,7 +101,6 @@ def export( status: OrderStatusFilter | None = typer.Option(None, "--status", case_sensitive=False, help="Order status filter."), source: AuditSource | None = typer.Option(None, "--source", case_sensitive=False), request_id: str | None = typer.Option(None, "--request-id", help="Command-audit filter by request_id."), - event_type: str | None = typer.Option(None, "--type", help="Risk event type filter."), ) -> None: state = get_state(ctx) command = "audit.export" @@ -139,8 +113,6 @@ def export( params["source"] = source.value if request_id: params["request_id"] = request_id - if event_type: - params["type"] = event_type try: result = run_async(daemon_request(state, command, params)) diff --git a/cli/src/daemon.py b/cli/src/daemon.py index f076a00..bac1178 100644 --- a/cli/src/daemon.py +++ b/cli/src/daemon.py @@ -66,7 +66,7 @@ def stop(ctx: typer.Context) -> None: handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) -@app.command("status", help="Show daemon uptime, IB connection state, and risk halt status.") +@app.command("status", help="Show daemon uptime and connection state.") def status(ctx: typer.Context) -> None: state = get_state(ctx) command = "daemon.status" diff --git a/cli/src/main.py b/cli/src/main.py index 611029e..3182df6 100644 --- a/cli/src/main.py +++ b/cli/src/main.py @@ -9,18 +9,17 @@ import market import orders import portfolio -import risk import schema_cmd import update from _common import CLIState, build_typer, load_config app = build_typer( - """Broker command-line interface for trading, portfolio, risk, and audit workflows. + """Broker command-line interface for trading, portfolio, and audit workflows. Examples: broker quote AAPL MSFT broker order buy AAPL 10 --limit 180 - broker check --side buy --symbol AAPL --qty 50 + broker positions """ ) @@ -28,7 +27,6 @@ app.add_typer(market.app) app.add_typer(orders.order_app, name="order") app.add_typer(portfolio.app) -app.add_typer(risk.app) app.add_typer(audit.app, name="audit") # Flat commands required by spec. diff --git a/cli/src/portfolio.py b/cli/src/portfolio.py index 910df69..d7586ea 100644 --- a/cli/src/portfolio.py +++ b/cli/src/portfolio.py @@ -121,7 +121,7 @@ def exposure( handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) -@app.command("snapshot", help="Fetch quote/portfolio/risk state in one request.") +@app.command("snapshot", help="Fetch quote and portfolio state in one request.") def snapshot( ctx: typer.Context, symbols: str | None = typer.Option( diff --git a/cli/src/risk.py b/cli/src/risk.py deleted file mode 100644 index 3540b3b..0000000 --- a/cli/src/risk.py +++ /dev/null @@ -1,154 +0,0 @@ -"""Risk-management commands.""" - -from __future__ import annotations - -import typer - -from _common import build_typer, daemon_request, get_state, handle_error, print_output, run_async -from broker_daemon.exceptions import BrokerError -from broker_daemon.models.orders import Side, TIF -from broker_daemon.risk.limits import mutable_params - -app = build_typer("Risk checks, limits, controls, and temporary overrides.") - -RISK_PARAMS = tuple(mutable_params()) - - -def _validate_risk_param(value: str) -> str: - normalized = value.strip().lower() - if normalized in RISK_PARAMS: - return normalized - valid = ", ".join(RISK_PARAMS) - raise typer.BadParameter(f"unknown risk parameter '{value}'. valid params: {valid}") - - -@app.command("check", help="Dry-run an order against risk limits (does not submit).") -def check( - ctx: typer.Context, - side: Side = typer.Option(..., "--side", case_sensitive=False, help="buy or sell."), - symbol: str = typer.Option(..., "--symbol", help="Ticker symbol."), - qty: float = typer.Option(..., "--qty", min=0.000001, help="Quantity to evaluate."), - limit: float | None = typer.Option(None, "--limit", help="Limit price."), - stop: float | None = typer.Option(None, "--stop", help="Stop trigger price."), - tif: TIF = typer.Option(TIF.DAY, "--tif", case_sensitive=False, help="DAY, GTC, IOC."), -) -> None: - state = get_state(ctx) - command = "risk.check" - params: dict[str, object] = {"side": side.value, "symbol": symbol, "qty": qty, "tif": tif.value} - if limit is not None: - params["limit"] = limit - if stop is not None: - params["stop"] = stop - - try: - result = run_async(daemon_request(state, command, params)) - print_output( - result.data, - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - -@app.command("limits", help="Show current runtime risk limits.") -def limits(ctx: typer.Context) -> None: - state = get_state(ctx) - command = "risk.limits" - try: - result = run_async(daemon_request(state, command, {})) - print_output( - result.data.get("limits", result.data), - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - -@app.command("set", help="Update a risk limit parameter at runtime.") -def set_limit( - ctx: typer.Context, - param: str = typer.Argument(..., callback=_validate_risk_param), - value: str = typer.Argument(..., help="New parameter value."), -) -> None: - state = get_state(ctx) - command = "risk.set" - try: - result = run_async(daemon_request(state, command, {"param": param, "value": value})) - print_output( - result.data.get("limits", result.data), - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - -@app.command("halt", help="Emergency halt: cancel open orders and reject new orders.") -def halt(ctx: typer.Context) -> None: - state = get_state(ctx) - command = "risk.halt" - try: - result = run_async(daemon_request(state, command, {})) - print_output( - result.data, - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - -@app.command("resume", help="Resume trading after a risk halt.") -def resume(ctx: typer.Context) -> None: - state = get_state(ctx) - command = "risk.resume" - try: - result = run_async(daemon_request(state, command, {})) - print_output( - result.data, - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) - - -@app.command("override", help="Apply a temporary risk override with required reason and duration.") -def override( - ctx: typer.Context, - param: str = typer.Option(..., "--param", callback=_validate_risk_param), - value: str = typer.Option(..., "--value", help="Override value."), - duration: str = typer.Option(..., "--duration", help="Duration like 30m, 1h, 1d."), - reason: str = typer.Option(..., "--reason", help="Required audit reason for the override."), -) -> None: - state = get_state(ctx) - command = "risk.override" - try: - result = run_async( - daemon_request( - state, - command, - {"param": param, "value": value, "duration": duration, "reason": reason}, - ) - ) - print_output( - result.data.get("override", result.data), - json_output=state.json_output, - command=command, - request_id=result.request_id, - strict=state.strict, - ) - except BrokerError as exc: - handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) diff --git a/cli/tests/test_cli/test_command_contracts.py b/cli/tests/test_cli/test_command_contracts.py index a710a2a..2e90352 100644 --- a/cli/tests/test_cli/test_command_contracts.py +++ b/cli/tests/test_cli/test_command_contracts.py @@ -11,7 +11,6 @@ import market import orders import portfolio -import risk import schema_cmd from main import app @@ -81,21 +80,14 @@ async def fake_daemon_request(_: Any, command: str, params: dict[str, Any] | Non "order.cancel": {"ok": True}, "orders.cancel_all": {"cancelled": 0}, "fills.list": {"fills": []}, - "risk.check": {"ok": True}, - "risk.limits": {"limits": {"max_order_value": 50000}}, - "risk.set": {"limits": {"max_order_value": 1000}}, - "risk.halt": {"halted": True}, - "risk.resume": {"halted": False}, - "risk.override": {"override": {"param": "max_order_value", "value": 1000}}, "audit.commands": {"commands": []}, "audit.orders": {"orders": []}, - "audit.risk": {"risk_events": []}, "audit.export": {"output": "/tmp/audit.csv", "rows": 0}, "schema.get": {"schema_version": "v1", "commands": {}}, } return FakeRPCResult(command, responses.get(command, {"ok": True})) - for mod in (audit, daemon, market, orders, portfolio, risk, schema_cmd): + for mod in (audit, daemon, market, orders, portfolio, schema_cmd): monkeypatch.setattr(mod, "daemon_request", fake_daemon_request) return calls @@ -122,12 +114,6 @@ def test_root_command_surface_contract(runner: CliRunner) -> None: "pnl", "balance", "exposure", - "check", - "limits", - "set", - "halt", - "resume", - "override", "snapshot", "audit", "schema", @@ -139,7 +125,7 @@ def test_root_command_surface_contract(runner: CliRunner) -> None: [ (["daemon", "--help"], {"start", "stop", "status", "restart"}), (["order", "--help"], {"buy", "sell", "bracket", "status"}), - (["audit", "--help"], {"orders", "commands", "risk", "export"}), + (["audit", "--help"], {"orders", "commands", "export"}), ], ) def test_subcommand_surface_contract( @@ -220,15 +206,8 @@ def test_subcommand_surface_contract( (["balance"], "portfolio.balance"), (["exposure"], "portfolio.exposure"), (["snapshot"], "portfolio.snapshot"), - (["check", "--side", "buy", "--symbol", "AAPL", "--qty", "1"], "risk.check"), - (["limits"], "risk.limits"), - (["set", "max_order_value", "1000"], "risk.set"), - (["halt"], "risk.halt"), - (["resume"], "risk.resume"), - (["override", "--param", "max_order_value", "--value", "1000", "--duration", "1h", "--reason", "test"], "risk.override"), (["audit", "orders"], "audit.orders"), (["audit", "commands"], "audit.commands"), - (["audit", "risk"], "audit.risk"), (["audit", "export", "--output", "/tmp/audit.csv"], "audit.export"), (["schema"], "schema.get"), (["daemon", "status"], "daemon.status"), diff --git a/daemon/README.md b/daemon/README.md index 529aee1..df61311 100644 --- a/daemon/README.md +++ b/daemon/README.md @@ -1,12 +1,11 @@ # Broker Daemon -`broker-daemon` is the long-running backend for broker execution and risk enforcement. +`broker-daemon` is the long-running backend for broker execution. ## Responsibilities - maintain session state to IB Gateway/TWS - route requests from CLI/SDK clients -- enforce risk controls before execution - persist audit events for traceability ## Interfaces diff --git a/daemon/src/broker_daemon/audit/logger.py b/daemon/src/broker_daemon/audit/logger.py index ac6af88..4d08232 100644 --- a/daemon/src/broker_daemon/audit/logger.py +++ b/daemon/src/broker_daemon/audit/logger.py @@ -107,16 +107,15 @@ async def upsert_order(self, record: OrderRecord) -> None: INSERT INTO orders ( client_order_id, ib_order_id, symbol, side, qty, order_type, limit_price, stop_price, tif, status, submitted_at, filled_at, fill_price, fill_qty, - commission, risk_check_result - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + commission + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(client_order_id) DO UPDATE SET ib_order_id = excluded.ib_order_id, status = excluded.status, filled_at = excluded.filled_at, fill_price = excluded.fill_price, fill_qty = excluded.fill_qty, - commission = excluded.commission, - risk_check_result = excluded.risk_check_result + commission = excluded.commission """, ( record.client_order_id, @@ -134,7 +133,6 @@ async def upsert_order(self, record: OrderRecord) -> None: record.fill_price, record.fill_qty, record.commission, - json.dumps(record.risk_check_result, sort_keys=True), ), ) @@ -157,12 +155,6 @@ async def log_fill(self, fill: FillRecord) -> None: ), ) - async def log_risk_event(self, event_type: str, details: dict[str, Any]) -> None: - await self._execute( - "INSERT INTO risk_events (timestamp, event_type, details) VALUES (?, ?, ?)", - (datetime.now(UTC).isoformat(), event_type, json.dumps(details, sort_keys=True)), - ) - async def log_connection_event(self, event: str, details: dict[str, Any]) -> None: await self._execute( "INSERT INTO connection_events (timestamp, event, details) VALUES (?, ?, ?)", diff --git a/daemon/src/broker_daemon/audit/query.py b/daemon/src/broker_daemon/audit/query.py index 39ae63a..a53fc95 100644 --- a/daemon/src/broker_daemon/audit/query.py +++ b/daemon/src/broker_daemon/audit/query.py @@ -55,18 +55,6 @@ async def query_orders( ) -async def query_risk_events( - logger: AuditLogger, - *, - event_type: str | None = None, -) -> list[dict[str, Any]]: - where, values = _where_clause({"event_type": event_type}) - return await logger.fetch_all( - f"SELECT timestamp, event_type, details FROM risk_events {where} ORDER BY id DESC", - tuple(values), - ) - - def export_rows_to_csv(rows: list[dict[str, Any]], path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) if not rows: diff --git a/daemon/src/broker_daemon/audit/schema.py b/daemon/src/broker_daemon/audit/schema.py index edad130..dab2c80 100644 --- a/daemon/src/broker_daemon/audit/schema.py +++ b/daemon/src/broker_daemon/audit/schema.py @@ -49,14 +49,6 @@ ) """, """ - CREATE TABLE IF NOT EXISTS risk_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - timestamp TEXT NOT NULL, - event_type TEXT NOT NULL, - details TEXT - ) - """, - """ CREATE TABLE IF NOT EXISTS connection_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, diff --git a/daemon/src/broker_daemon/config.py b/daemon/src/broker_daemon/config.py index 4003e63..05100a3 100644 --- a/daemon/src/broker_daemon/config.py +++ b/daemon/src/broker_daemon/config.py @@ -42,19 +42,6 @@ class ETradeConfig(BaseModel): persistent_auth: bool = False -class RiskConfig(BaseModel): - max_position_pct: float = 10.0 - max_order_value: float = 50_000.0 - max_daily_loss_pct: float = 2.0 - max_sector_exposure_pct: float = 30.0 - max_single_name_pct: float = 10.0 - max_open_orders: int = 20 - order_rate_limit: int = 10 - duplicate_window_seconds: int = 60 - symbol_allowlist: list[str] = Field(default_factory=list) - symbol_blocklist: list[str] = Field(default_factory=list) - - class LoggingConfig(BaseModel): level: str = "INFO" audit_db: Path = DEFAULT_STATE_HOME / "audit.db" @@ -63,8 +50,6 @@ class LoggingConfig(BaseModel): class AgentConfig(BaseModel): - heartbeat_timeout_seconds: int = 300 - on_heartbeat_timeout: str = "warn" default_output: str = "json" @@ -136,7 +121,6 @@ class AppConfig(BaseModel): provider: str = "ib" gateway: GatewayConfig = Field(default_factory=GatewayConfig) etrade: ETradeConfig = Field(default_factory=ETradeConfig) - risk: RiskConfig = Field(default_factory=RiskConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig) agent: AgentConfig = Field(default_factory=AgentConfig) output: OutputConfig = Field(default_factory=OutputConfig) @@ -211,7 +195,7 @@ def _read_broker_json(path: Path) -> dict[str, Any]: def _extract_broker_config(data: dict[str, Any]) -> dict[str, Any]: out: dict[str, Any] = {} raw_broker = data.get("broker") - sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data", "observability"} + sections = {"gateway", "etrade", "logging", "agent", "output", "runtime", "market_data", "observability"} if isinstance(raw_broker, dict): provider = raw_broker.get("provider") @@ -234,7 +218,7 @@ def _extract_broker_config(data: dict[str, Any]) -> dict[str, Any]: def _apply_env_overrides(data: dict[str, Any]) -> dict[str, Any]: result = dict(data) - sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data", "observability"} + sections = {"gateway", "etrade", "logging", "agent", "output", "runtime", "market_data", "observability"} for key, raw in os.environ.items(): if key == "BROKER_PROVIDER": result["provider"] = raw.strip() diff --git a/daemon/src/broker_daemon/daemon/order_manager.py b/daemon/src/broker_daemon/daemon/order_manager.py index 94e95df..3a091f1 100644 --- a/daemon/src/broker_daemon/daemon/order_manager.py +++ b/daemon/src/broker_daemon/daemon/order_manager.py @@ -1,4 +1,4 @@ -"""Order state machine and risk-enforced order entry.""" +"""Order state machine and order entry.""" from __future__ import annotations @@ -11,7 +11,6 @@ from broker_daemon.models.orders import FillRecord, OrderRecord, OrderRequest, OrderStatus, OrderType, Side from broker_daemon.observability.fund_sync import FundSyncService from broker_daemon.providers import BrokerProvider -from broker_daemon.risk.engine import RiskContext, RiskEngine ACTIVE_STATUSES = { OrderStatus.SUBMITTED, @@ -26,13 +25,11 @@ def __init__( self, *, provider: BrokerProvider, - risk: RiskEngine, audit: AuditLogger, event_cb: Callable[[Event], Awaitable[None]] | None = None, fund_sync: FundSyncService | None = None, ) -> None: self._provider = provider - self._risk = risk self._audit = audit self._event_cb = event_cb self._fund_sync = fund_sync @@ -49,44 +46,11 @@ def _infer_order_type(self, req: OrderRequest) -> OrderType: return OrderType.STOP return OrderType.MARKET - async def _risk_context(self) -> RiskContext: - balance = await self._provider.balance() - nlv = float(balance.net_liquidation or 0.0) - positions = await self._provider.positions() - - symbols = [p.symbol for p in positions] - quotes = await self._provider.quote(symbols) if symbols else [] - marks = { - q.symbol: (q.last if q.last is not None else q.bid if q.bid is not None else q.ask if q.ask is not None else 0.0) - for q in quotes - } - - position_values: dict[str, float] = {} - for p in positions: - mark = marks.get(p.symbol) - if mark is None: - mark = p.market_price if p.market_price is not None else p.avg_cost - position_values[p.symbol] = float(mark) * p.qty - - pnl = await self._provider.pnl() - open_orders = len([o for o in self._orders.values() if o.status in ACTIVE_STATUSES]) - - return RiskContext( - nlv=nlv, - daily_pnl=pnl.total, - open_orders=open_orders, - mark_prices=marks, - position_values=position_values, - ) - async def place_order(self, request: OrderRequest) -> OrderRecord: client_order_id = request.client_order_id or str(uuid.uuid4()) if client_order_id in self._orders: return self._orders[client_order_id] - context = await self._risk_context() - risk_result = self._risk.assert_order(request, context) - record = OrderRecord( client_order_id=client_order_id, symbol=request.symbol, @@ -97,7 +61,6 @@ async def place_order(self, request: OrderRequest) -> OrderRecord: stop_price=request.stop, tif=request.tif, status=OrderStatus.PENDING_SUBMIT, - risk_check_result=risk_result.model_dump(mode="json"), tags=dict(request.tags), ) @@ -108,7 +71,6 @@ async def place_order(self, request: OrderRequest) -> OrderRecord: self._orders[client_order_id] = record await self._audit.upsert_order(record) - await self._audit.log_risk_event("check_passed", {"client_order_id": client_order_id}) await self._emit( Event( @@ -149,10 +111,6 @@ async def place_bracket( tif: str, decision: dict[str, str] | None = None, ) -> dict[str, Any]: - request = OrderRequest(side=side, symbol=symbol, qty=qty, limit=entry, tif=tif) - context = await self._risk_context() - self._risk.assert_order(request, context) - client_order_id = str(uuid.uuid4()) result = await self._provider.place_bracket( side=side, @@ -164,7 +122,6 @@ async def place_bracket( tif=tif, client_order_id=client_order_id, ) - await self._audit.log_risk_event("check_passed", {"client_order_id": client_order_id, "type": "bracket"}) await self._emit( Event( topic=EventTopic.ORDERS, diff --git a/daemon/src/broker_daemon/daemon/server.py b/daemon/src/broker_daemon/daemon/server.py index af22bf1..da47166 100644 --- a/daemon/src/broker_daemon/daemon/server.py +++ b/daemon/src/broker_daemon/daemon/server.py @@ -17,7 +17,7 @@ from pydantic import ValidationError from broker_daemon.audit.logger import AuditLogger -from broker_daemon.audit.query import export_rows_to_csv, query_commands, query_orders, query_risk_events +from broker_daemon.audit.query import export_rows_to_csv, query_commands, query_orders from broker_daemon.config import AppConfig, load_config from broker_daemon.daemon.market_data import MarketDataService from broker_daemon.daemon.order_manager import OrderManager @@ -28,8 +28,6 @@ from broker_daemon.observability import FundSyncService from broker_daemon.protocol import ErrorResponse, EventEnvelope, Request, Response, decode_request, encode_model, frame_payload, read_framed from broker_daemon.providers import IBProvider -from broker_daemon.risk.engine import RiskEngine -from broker_daemon.risk.monitor import ConnectionLossMonitor, HeartbeatMonitor logger = logging.getLogger(__name__) @@ -52,17 +50,10 @@ "order.cancel", "orders.cancel_all", "fills.list", - "risk.check", - "risk.limits", - "risk.set", - "risk.halt", - "risk.resume", - "risk.override", "runtime.keepalive", "events.subscribe", "audit.commands", "audit.orders", - "audit.risk", "audit.export", "schema.get", ) @@ -84,9 +75,6 @@ def __init__(self, cfg: AppConfig) -> None: self._shutdown = asyncio.Event() self._audit = AuditLogger(cfg.logging.audit_db) - self._risk = RiskEngine(cfg.risk) - self._heartbeat = HeartbeatMonitor(cfg.agent.heartbeat_timeout_seconds) - self._connection_loss = ConnectionLossMonitor(threshold_seconds=30) if cfg.provider == "etrade": from broker_daemon.providers.etrade import ETradeProvider @@ -99,7 +87,6 @@ def __init__(self, cfg: AppConfig) -> None: self._market_data = MarketDataService(self._provider, settings=cfg.market_data) self._orders = OrderManager( provider=self._provider, - risk=self._risk, audit=self._audit, event_cb=self._broadcast_event, fund_sync=self._fund_sync, @@ -473,8 +460,6 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: "pnl": pnl.model_dump(mode="json"), "exposure": exposure_rows, "exposure_by": exposure_by, - "risk_limits": self._risk.snapshot().model_dump(mode="json"), - "risk_halted": self._risk.halted, "connection": self._provider.status().model_dump(mode="json"), "provider_capabilities": provider_capabilities.model_dump(mode="json"), "provider_capabilities_cache": capabilities_cache, @@ -494,36 +479,16 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: req = OrderRequest.model_validate(raw) if dry_run: - context = await self._orders._risk_context() # noqa: SLF001 - risk_result = self._risk.check_order(req, context) - event_type = "check_passed" if risk_result.ok else "check_failed" - await self._audit.log_risk_event( - event_type, - { - "dry_run": True, - "symbol": req.symbol, - "side": req.side.value, - "qty": req.qty, - **risk_result.model_dump(mode="json"), - }, - ) - preview_order = _build_dry_run_order_preview( - req, - risk_result=risk_result.model_dump(mode="json"), - ) + preview_order = _build_dry_run_order_preview(req) return { "order": preview_order, "dry_run": True, - "risk_check": risk_result.model_dump(mode="json"), - "submit_allowed": bool(risk_result.ok), } record = await self._orders.place_order(req) return { "order": record.model_dump(mode="json"), "dry_run": False, - "risk_check": record.risk_check_result, - "submit_allowed": True, } if cmd == "order.bracket": @@ -583,55 +548,7 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: rows = [r for r in rows if str(r.get("timestamp", "")) >= str(since)] return {"fills": rows} - if cmd == "risk.check": - req = OrderRequest.model_validate(p) - context = await self._orders._risk_context() # noqa: SLF001 - result = self._risk.check_order(req, context) - event_type = "check_passed" if result.ok else "check_failed" - await self._audit.log_risk_event(event_type, result.model_dump(mode="json")) - return result.model_dump(mode="json") - - if cmd == "risk.limits": - return {"limits": self._risk.snapshot().model_dump(mode="json")} - - if cmd == "risk.set": - try: - snapshot = self._risk.set_limit(str(p["param"]), p["value"]) - except ValueError as exc: - raise BrokerError(ErrorCode.INVALID_ARGS, str(exc)) from exc - await self._audit.log_risk_event("set", {"param": p["param"], "value": p["value"]}) - return {"limits": snapshot.model_dump(mode="json")} - - if cmd == "risk.halt": - self._risk.halt() - await self._orders.cancel_all() - await self._audit.log_risk_event("halt", {"source": request.source}) - await self._broadcast_event(Event(topic=EventTopic.RISK, payload={"event": "halt"})) - return {"halted": True} - - if cmd == "risk.resume": - self._risk.resume() - await self._audit.log_risk_event("resume", {"source": request.source}) - await self._broadcast_event(Event(topic=EventTopic.RISK, payload={"event": "resume"})) - return {"halted": False} - - if cmd == "risk.override": - try: - duration = str(p.get("duration", "1h")) - seconds = self._risk.parse_duration(duration) - override = self._risk.override_limit( - param=str(p["param"]), - value=p["value"], - duration_seconds=seconds, - reason=str(p.get("reason", "manual override")), - ) - except ValueError as exc: - raise BrokerError(ErrorCode.INVALID_ARGS, str(exc)) from exc - await self._audit.log_risk_event("override", override.model_dump(mode="json")) - return {"override": override.model_dump(mode="json")} - if cmd == "runtime.keepalive": - self._heartbeat.beat() sent_at = p.get("sent_at") latency_ms = None if sent_at is not None: @@ -643,7 +560,6 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: "ok": True, "latency_ms": latency_ms, "connected": self._provider.is_connected, - "halted": self._risk.halted, } if cmd == "audit.commands": @@ -659,10 +575,6 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: rows = await query_orders(self._audit, status=p.get("status"), since=p.get("since")) return {"orders": rows} - if cmd == "audit.risk": - rows = await query_risk_events(self._audit, event_type=p.get("type")) - return {"risk_events": rows} - if cmd == "audit.export": target = Path(str(p["output"])).expanduser() fmt = str(p.get("format", "csv")) @@ -677,8 +589,6 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: since=p.get("since"), request_id=p.get("request_id"), ) - elif table == "risk": - rows = await query_risk_events(self._audit, event_type=p.get("type")) else: rows = await query_orders(self._audit, status=p.get("status"), since=p.get("since")) export_rows_to_csv(rows, target) @@ -696,19 +606,11 @@ async def _cmd_daemon_status(self) -> dict[str, Any]: "uptime_seconds": round(time.monotonic() - self._start_monotonic, 3), "connection": status.model_dump(mode="json"), "provider_capabilities": dict(self._provider.capabilities), - "risk_halted": self._risk.halted, "time_sync_delta_ms": None, "socket": str(self.socket_path), } async def _on_broker_event(self, event: Event) -> None: - if event.topic == EventTopic.CONNECTION: - label = str(event.payload.get("event", "")) - if label == "connected": - self._connection_loss.on_connected() - if label == "disconnected": - self._connection_loss.on_disconnected() - if event.topic == EventTopic.ORDERS: client_order_id = event.payload.get("client_order_id") status = event.payload.get("status") @@ -764,53 +666,15 @@ async def _monitor_loop(self) -> None: while not self._shutdown.is_set(): await asyncio.sleep(5) - if self._connection_loss.breached() and not self._risk.halted: - self._risk.halt() - await self._audit.log_risk_event("halt", {"reason": "connection_loss"}) - await self._broadcast_event(Event(topic=EventTopic.RISK, payload={"event": "halt", "reason": "connection_loss"})) - - if self._heartbeat.is_timed_out(): - seconds = self._heartbeat.seconds_since_last() - await self._audit.log_risk_event("heartbeat_timeout", {"seconds_since_last": seconds}) - if self._cfg.agent.on_heartbeat_timeout == "halt" and not self._risk.halted: - self._risk.halt() - await self._broadcast_event( - Event(topic=EventTopic.RISK, payload={"event": "halt", "reason": "heartbeat_timeout"}) - ) - if self._provider.is_connected: health_ok = await self._provider.check_health() if not health_ok: - self._connection_loss.on_disconnected() await self._broadcast_event( Event( topic=EventTopic.CONNECTION, payload={"event": "disconnected", "reason": "health_check_failed"}, ) ) - continue - - if self._provider.is_connected: - try: - balance = await self._provider.balance() - pnl = await self._provider.pnl() - nlv = float(balance.net_liquidation or 0.0) - breached, loss_pct = self._risk.check_drawdown_breaker(pnl.total, nlv) - if breached and not self._risk.halted: - self._risk.halt() - await self._audit.log_risk_event( - "halt", - { - "reason": "drawdown_breaker", - "daily_pnl": pnl.total, - "loss_pct": loss_pct, - }, - ) - await self._broadcast_event( - Event(topic=EventTopic.RISK, payload={"event": "halt", "reason": "drawdown_breaker"}) - ) - except Exception: - logger.debug("drawdown monitor skipped due to transient error", exc_info=True) async def _fills_reconcile_loop(self) -> None: interval = self._cfg.observability.etrade_fill_poll_seconds @@ -908,7 +772,7 @@ def _parse_chain_fields(raw: Any) -> list[str] | None: return list(dict.fromkeys(values)) -def _build_dry_run_order_preview(req: OrderRequest, *, risk_result: dict[str, Any]) -> dict[str, Any]: +def _build_dry_run_order_preview(req: OrderRequest) -> dict[str, Any]: if req.limit is not None and req.stop is not None: order_type = "stop_limit" elif req.limit is not None: @@ -918,7 +782,6 @@ def _build_dry_run_order_preview(req: OrderRequest, *, risk_result: dict[str, An else: order_type = "market" - status = "DryRunAccepted" if bool(risk_result.get("ok")) else "DryRunRejected" client_order_id = req.client_order_id or f"dryrun-{int(time.time() * 1000)}" return { "client_order_id": client_order_id, @@ -930,13 +793,12 @@ def _build_dry_run_order_preview(req: OrderRequest, *, risk_result: dict[str, An "limit_price": req.limit, "stop_price": req.stop, "tif": req.tif.value, - "status": status, + "status": "DryRunPreview", "submitted_at": datetime.now(UTC).isoformat(), "filled_at": None, "fill_price": None, "fill_qty": 0.0, "commission": None, - "risk_check_result": risk_result, } @@ -1130,8 +992,6 @@ def _command_schema_registry() -> dict[str, dict[str, Any]]: "positions", "balance", "pnl", - "risk_limits", - "risk_halted", "connection", ], }, @@ -1163,10 +1023,8 @@ def _command_schema_registry() -> dict[str, dict[str, Any]]: "properties": { "order": {"type": "object"}, "dry_run": {"type": "boolean"}, - "risk_check": {"type": "object"}, - "submit_allowed": {"type": "boolean"}, }, - "required": ["order", "dry_run", "risk_check", "submit_allowed"], + "required": ["order", "dry_run"], }, } @@ -1191,23 +1049,6 @@ def _command_schema_registry() -> dict[str, dict[str, Any]]: "result": {"type": "object", "additionalProperties": True}, } - base["risk.check"] = { - "params": { - "type": "object", - "additionalProperties": False, - "properties": { - "side": {"enum": ["buy", "sell"]}, - "symbol": {"type": "string"}, - "qty": {"type": "number", "exclusiveMinimum": 0}, - "limit": {"type": "number"}, - "stop": {"type": "number"}, - "tif": {"enum": ["DAY", "GTC", "IOC"]}, - }, - "required": ["side", "symbol", "qty"], - }, - "result": {"type": "object", "additionalProperties": True}, - } - base["audit.commands"] = { "params": { "type": "object", diff --git a/daemon/src/broker_daemon/exceptions.py b/daemon/src/broker_daemon/exceptions.py index 52150db..e52c3e4 100644 --- a/daemon/src/broker_daemon/exceptions.py +++ b/daemon/src/broker_daemon/exceptions.py @@ -10,10 +10,7 @@ class ErrorCode(str, Enum): DAEMON_NOT_RUNNING = "DAEMON_NOT_RUNNING" IB_DISCONNECTED = "IB_DISCONNECTED" IB_REJECTED = "IB_REJECTED" - RISK_CHECK_FAILED = "RISK_CHECK_FAILED" - RISK_HALTED = "RISK_HALTED" RATE_LIMITED = "RATE_LIMITED" - DUPLICATE_ORDER = "DUPLICATE_ORDER" INVALID_SYMBOL = "INVALID_SYMBOL" INVALID_ARGS = "INVALID_ARGS" TIMEOUT = "TIMEOUT" @@ -24,8 +21,6 @@ class ErrorCode(str, Enum): ErrorCode.INVALID_ARGS: 2, ErrorCode.DAEMON_NOT_RUNNING: 3, ErrorCode.IB_DISCONNECTED: 4, - ErrorCode.RISK_CHECK_FAILED: 5, - ErrorCode.RISK_HALTED: 6, ErrorCode.TIMEOUT: 10, } diff --git a/daemon/src/broker_daemon/models/__init__.py b/daemon/src/broker_daemon/models/__init__.py index f8c88e5..f6b5350 100644 --- a/daemon/src/broker_daemon/models/__init__.py +++ b/daemon/src/broker_daemon/models/__init__.py @@ -4,7 +4,6 @@ from broker_daemon.models.market import Bar, OptionChain, OptionChainEntry, Quote from broker_daemon.models.orders import FillRecord, OrderRecord, OrderRequest, OrderStatus, OrderType, Side, TIF from broker_daemon.models.portfolio import Balance, ExposureEntry, PnLSummary, Position -from broker_daemon.models.risk import RiskCheckResult, RiskConfigSnapshot, RiskOverride __all__ = [ "Bar", @@ -22,9 +21,6 @@ "PnLSummary", "Position", "Quote", - "RiskCheckResult", - "RiskConfigSnapshot", - "RiskOverride", "Side", "TIF", ] diff --git a/daemon/src/broker_daemon/models/events.py b/daemon/src/broker_daemon/models/events.py index 85d69c6..294d413 100644 --- a/daemon/src/broker_daemon/models/events.py +++ b/daemon/src/broker_daemon/models/events.py @@ -14,7 +14,6 @@ class EventTopic(str, Enum): FILLS = "fills" POSITIONS = "positions" PNL = "pnl" - RISK = "risk" CONNECTION = "connection" diff --git a/daemon/src/broker_daemon/models/orders.py b/daemon/src/broker_daemon/models/orders.py index 774c046..989a5f1 100644 --- a/daemon/src/broker_daemon/models/orders.py +++ b/daemon/src/broker_daemon/models/orders.py @@ -71,7 +71,6 @@ class OrderRecord(BaseModel): fill_price: float | None = None fill_qty: float = 0 commission: float | None = None - risk_check_result: dict[str, Any] = Field(default_factory=dict) tags: dict[str, Any] = Field(default_factory=dict) diff --git a/daemon/src/broker_daemon/models/risk.py b/daemon/src/broker_daemon/models/risk.py deleted file mode 100644 index cb842b7..0000000 --- a/daemon/src/broker_daemon/models/risk.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Risk models shared across daemon, CLI and SDK.""" - -from __future__ import annotations - -from datetime import UTC, datetime, timedelta -from typing import Any - -from pydantic import BaseModel, Field - - -class RiskCheckResult(BaseModel): - ok: bool - reasons: list[str] = Field(default_factory=list) - details: dict[str, Any] = Field(default_factory=dict) - suggestion: str | None = None - - -class RiskOverride(BaseModel): - param: str - value: float - reason: str - created_at: datetime = Field(default_factory=lambda: datetime.now(UTC)) - expires_at: datetime - - @classmethod - def from_duration(cls, param: str, value: float, reason: str, seconds: int) -> "RiskOverride": - now = datetime.now(UTC) - return cls(param=param, value=value, reason=reason, created_at=now, expires_at=now + timedelta(seconds=seconds)) - - -class RiskConfigSnapshot(BaseModel): - max_position_pct: float - max_order_value: float - max_daily_loss_pct: float - max_sector_exposure_pct: float - max_single_name_pct: float - max_open_orders: int - order_rate_limit: int - duplicate_window_seconds: int - symbol_allowlist: list[str] - symbol_blocklist: list[str] - halted: bool = False diff --git a/daemon/src/broker_daemon/risk/__init__.py b/daemon/src/broker_daemon/risk/__init__.py deleted file mode 100644 index ddda4de..0000000 --- a/daemon/src/broker_daemon/risk/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Risk engine package.""" - -from broker_daemon.risk.engine import RiskContext, RiskEngine -from broker_daemon.risk.monitor import ConnectionLossMonitor, HeartbeatMonitor - -__all__ = ["ConnectionLossMonitor", "HeartbeatMonitor", "RiskContext", "RiskEngine"] diff --git a/daemon/src/broker_daemon/risk/engine.py b/daemon/src/broker_daemon/risk/engine.py deleted file mode 100644 index 65e8976..0000000 --- a/daemon/src/broker_daemon/risk/engine.py +++ /dev/null @@ -1,215 +0,0 @@ -"""Risk engine implementing mandatory pre-trade checks.""" - -from __future__ import annotations - -from collections import deque -from dataclasses import dataclass, field -from datetime import UTC, datetime, timedelta -from typing import Any - -from broker_daemon.config import RiskConfig -from broker_daemon.exceptions import ErrorCode, BrokerError -from broker_daemon.models.orders import OrderRequest, Side -from broker_daemon.models.risk import RiskCheckResult, RiskConfigSnapshot, RiskOverride -from broker_daemon.risk.limits import coerce_param, config_to_dict, validate_param - - -@dataclass -class RiskContext: - nlv: float = 0.0 - daily_pnl: float = 0.0 - open_orders: int = 0 - mark_prices: dict[str, float] = field(default_factory=dict) - position_values: dict[str, float] = field(default_factory=dict) - sector_by_symbol: dict[str, str] = field(default_factory=dict) - sector_exposure_values: dict[str, float] = field(default_factory=dict) - - -class RiskEngine: - def __init__(self, config: RiskConfig) -> None: - self._limits = config_to_dict(config) - self._halted = False - self._order_times: deque[datetime] = deque() - self._duplicate_times: dict[str, datetime] = {} - self._overrides: list[RiskOverride] = [] - - @property - def halted(self) -> bool: - return self._halted - - def _cleanup_state(self) -> None: - now = datetime.now(UTC) - - while self._order_times and (now - self._order_times[0]).total_seconds() > 60: - self._order_times.popleft() - - duplicate_window = self._effective_value("duplicate_window_seconds") - self._duplicate_times = { - key: ts for key, ts in self._duplicate_times.items() if (now - ts).total_seconds() <= duplicate_window - } - - self._overrides = [ov for ov in self._overrides if ov.expires_at > now] - - def _effective_value(self, param: str) -> Any: - for override in reversed(self._overrides): - if override.param == param and override.expires_at > datetime.now(UTC): - return override.value - return self._limits[param] - - def snapshot(self) -> RiskConfigSnapshot: - self._cleanup_state() - payload = {key: self._effective_value(key) for key in self._limits} - return RiskConfigSnapshot.model_validate({**payload, "halted": self._halted}) - - def set_limit(self, param: str, value: Any) -> RiskConfigSnapshot: - key = validate_param(param) - self._limits[key] = coerce_param(key, value) - return self.snapshot() - - def override_limit(self, param: str, value: Any, duration_seconds: int, reason: str) -> RiskOverride: - key = validate_param(param) - coerced = coerce_param(key, value) - if not isinstance(coerced, (int, float)): - raise ValueError(f"risk override supports only numeric params, got '{key}'") - override = RiskOverride.from_duration(param=key, value=float(coerced), reason=reason, seconds=duration_seconds) - self._overrides.append(override) - return override - - def halt(self) -> None: - self._halted = True - - def resume(self) -> None: - self._halted = False - - def assert_order(self, order: OrderRequest, context: RiskContext) -> RiskCheckResult: - result = self.check_order(order, context) - if not result.ok: - code = ErrorCode.RISK_HALTED if self._halted else ErrorCode.RISK_CHECK_FAILED - violation_codes = {str(item) for item in result.details.get("violation_codes", [])} - if ErrorCode.RATE_LIMITED.value in violation_codes: - code = ErrorCode.RATE_LIMITED - elif ErrorCode.DUPLICATE_ORDER.value in violation_codes: - code = ErrorCode.DUPLICATE_ORDER - raise BrokerError(code, "; ".join(result.reasons), details=result.details, suggestion=result.suggestion) - return result - - def check_order(self, order: OrderRequest, context: RiskContext) -> RiskCheckResult: - self._cleanup_state() - reasons: list[str] = [] - details: dict[str, Any] = {} - violation_codes: set[str] = set() - - if self._halted: - return RiskCheckResult( - ok=False, - reasons=["trading is halted"], - details={"halted": True, "violation_codes": [ErrorCode.RISK_HALTED.value]}, - ) - - symbol = order.symbol.upper() - allowlist = self._effective_value("symbol_allowlist") - blocklist = self._effective_value("symbol_blocklist") - if allowlist and symbol not in allowlist: - reasons.append(f"symbol {symbol} is not in allowlist") - if blocklist and symbol in blocklist: - reasons.append(f"symbol {symbol} is in blocklist") - - now = datetime.now(UTC) - rate_limit = int(self._effective_value("order_rate_limit")) - if len(self._order_times) >= rate_limit: - reasons.append(f"order rate limit exceeded ({rate_limit}/minute)") - details["orders_last_minute"] = len(self._order_times) - details["limit"] = rate_limit - violation_codes.add(ErrorCode.RATE_LIMITED.value) - - duplicate_key = f"{order.side.value}:{symbol}:{order.qty}:{order.limit}:{order.stop}:{order.tif.value}" - if duplicate_key in self._duplicate_times: - reasons.append("duplicate order detected inside duplicate window") - details["duplicate_window_seconds"] = self._effective_value("duplicate_window_seconds") - violation_codes.add(ErrorCode.DUPLICATE_ORDER.value) - - mark = order.limit or order.stop or context.mark_prices.get(symbol) - if mark is None: - mark = 0.0 - notional = abs(order.qty * mark) - details["notional"] = notional - - max_order_value = float(self._effective_value("max_order_value")) - if max_order_value > 0 and notional > max_order_value: - reasons.append(f"order notional {notional:.2f} exceeds max_order_value {max_order_value:.2f}") - - max_open_orders = int(self._effective_value("max_open_orders")) - if context.open_orders >= max_open_orders: - reasons.append(f"open orders {context.open_orders} exceed max_open_orders {max_open_orders}") - - nlv = float(context.nlv or 0.0) - if nlv > 0: - current_value = float(context.position_values.get(symbol, 0.0)) - signed_notional = notional if order.side == Side.BUY else -notional - projected_value = current_value + signed_notional - projected_pct = abs(projected_value) / nlv * 100.0 - - max_position_pct = float(self._effective_value("max_position_pct")) - if projected_pct > max_position_pct: - reasons.append(f"projected position {projected_pct:.2f}% exceeds max_position_pct {max_position_pct:.2f}%") - - max_single_name_pct = float(self._effective_value("max_single_name_pct")) - if projected_pct > max_single_name_pct: - reasons.append( - f"projected position {projected_pct:.2f}% exceeds max_single_name_pct {max_single_name_pct:.2f}%" - ) - - sector = context.sector_by_symbol.get(symbol) - if sector: - current_sector = float(context.sector_exposure_values.get(sector, 0.0)) - projected_sector_pct = abs(current_sector + signed_notional) / nlv * 100.0 - details["sector"] = sector - details["projected_sector_pct"] = round(projected_sector_pct, 4) - max_sector = float(self._effective_value("max_sector_exposure_pct")) - if projected_sector_pct > max_sector: - reasons.append( - f"projected sector exposure {projected_sector_pct:.2f}% exceeds max_sector_exposure_pct {max_sector:.2f}%" - ) - - max_daily_loss_pct = float(self._effective_value("max_daily_loss_pct")) - loss_pct = abs(min(context.daily_pnl, 0.0)) / nlv * 100.0 - details["daily_loss_pct"] = round(loss_pct, 4) - if loss_pct > max_daily_loss_pct: - reasons.append(f"daily drawdown {loss_pct:.2f}% exceeds max_daily_loss_pct {max_daily_loss_pct:.2f}%") - - if reasons: - if violation_codes: - details["violation_codes"] = sorted(violation_codes) - suggestion = None - if notional > max_order_value and mark: - max_qty = int(max_order_value / mark) - suggestion = f"reduce quantity to <= {max_qty}" - return RiskCheckResult(ok=False, reasons=reasons, details=details, suggestion=suggestion) - - self._order_times.append(now) - self._duplicate_times[duplicate_key] = now - return RiskCheckResult(ok=True, reasons=[], details=details) - - def check_drawdown_breaker(self, daily_pnl: float, nlv: float) -> tuple[bool, float]: - if nlv <= 0: - return False, 0.0 - loss_pct = abs(min(daily_pnl, 0.0)) / nlv * 100.0 - breached = loss_pct > float(self._effective_value("max_daily_loss_pct")) - return breached, loss_pct - - def list_overrides(self) -> list[RiskOverride]: - self._cleanup_state() - return list(self._overrides) - - @staticmethod - def parse_duration(value: str) -> int: - raw = value.strip().lower() - if raw.endswith("h"): - return int(raw[:-1]) * 3600 - if raw.endswith("m"): - return int(raw[:-1]) * 60 - if raw.endswith("s"): - return int(raw[:-1]) - if raw.isdigit(): - return int(raw) - raise ValueError(f"invalid duration '{value}'") diff --git a/daemon/src/broker_daemon/risk/limits.py b/daemon/src/broker_daemon/risk/limits.py deleted file mode 100644 index cfb7bb5..0000000 --- a/daemon/src/broker_daemon/risk/limits.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Risk parameter definitions.""" - -from __future__ import annotations - -from collections.abc import Callable -from typing import Any - -from broker_daemon.config import RiskConfig - - -def _to_float(value: Any) -> float: - return float(value) - - -def _to_int(value: Any) -> int: - return int(value) - - -def _to_symbol_list(value: Any) -> list[str]: - if isinstance(value, str): - return [item.strip().upper() for item in value.split(",") if item.strip()] - if isinstance(value, list): - return [str(item).upper() for item in value] - raise ValueError(f"unsupported symbol list value: {value!r}") - - -RISK_PARAM_COERCERS: dict[str, Callable[[Any], Any]] = { - "max_position_pct": _to_float, - "max_order_value": _to_float, - "max_daily_loss_pct": _to_float, - "max_sector_exposure_pct": _to_float, - "max_single_name_pct": _to_float, - "max_open_orders": _to_int, - "order_rate_limit": _to_int, - "duplicate_window_seconds": _to_int, - "symbol_allowlist": _to_symbol_list, - "symbol_blocklist": _to_symbol_list, -} - - -def mutable_params() -> list[str]: - return sorted(RISK_PARAM_COERCERS.keys()) - - -def validate_param(name: str) -> str: - if name not in RISK_PARAM_COERCERS: - valid = ", ".join(mutable_params()) - raise ValueError(f"unknown risk parameter '{name}'. valid params: {valid}") - return name - - -def coerce_param(name: str, value: Any) -> Any: - return RISK_PARAM_COERCERS[validate_param(name)](value) - - -def config_to_dict(cfg: RiskConfig) -> dict[str, Any]: - return cfg.model_dump(mode="python") diff --git a/daemon/src/broker_daemon/risk/monitor.py b/daemon/src/broker_daemon/risk/monitor.py deleted file mode 100644 index a275e1f..0000000 --- a/daemon/src/broker_daemon/risk/monitor.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Continuous risk monitors for drawdown and heartbeat policies.""" - -from __future__ import annotations - -from datetime import UTC, datetime - - -class HeartbeatMonitor: - def __init__(self, timeout_seconds: int) -> None: - self._timeout_seconds = timeout_seconds - self._last_heartbeat: datetime | None = None - - def beat(self) -> None: - self._last_heartbeat = datetime.now(UTC) - - def seconds_since_last(self) -> float | None: - if not self._last_heartbeat: - return None - return (datetime.now(UTC) - self._last_heartbeat).total_seconds() - - def is_timed_out(self) -> bool: - delta = self.seconds_since_last() - if delta is None: - return False - return delta > self._timeout_seconds - - -class ConnectionLossMonitor: - def __init__(self, threshold_seconds: int = 30) -> None: - self._threshold_seconds = threshold_seconds - self._disconnected_at: datetime | None = None - - def on_connected(self) -> None: - self._disconnected_at = None - - def on_disconnected(self) -> None: - if self._disconnected_at is None: - self._disconnected_at = datetime.now(UTC) - - def breached(self) -> bool: - if self._disconnected_at is None: - return False - seconds = (datetime.now(UTC) - self._disconnected_at).total_seconds() - return seconds > self._threshold_seconds diff --git a/daemon/tests/test_daemon/test_dispatch_validation.py b/daemon/tests/test_daemon/test_dispatch_validation.py index bc462eb..8d2311b 100644 --- a/daemon/tests/test_daemon/test_dispatch_validation.py +++ b/daemon/tests/test_daemon/test_dispatch_validation.py @@ -6,9 +6,7 @@ from broker_daemon.daemon.server import DaemonServer from broker_daemon.exceptions import ErrorCode, BrokerError from broker_daemon.models.market import OptionChain, OptionChainEntry -from broker_daemon.models.risk import RiskCheckResult from broker_daemon.protocol import Request -from broker_daemon.risk.engine import RiskContext def _test_config(tmp_path) -> AppConfig: @@ -140,25 +138,13 @@ async def fake_chain(**_: object) -> OptionChain: async def test_dispatch_order_place_dry_run_preview(monkeypatch: pytest.MonkeyPatch, tmp_path) -> None: server = DaemonServer(_test_config(tmp_path)) - async def fake_risk_context() -> RiskContext: - return RiskContext(nlv=1_000_000.0, daily_pnl=0.0, open_orders=0, mark_prices={"AAPL": 200.0}) - called: dict[str, bool] = {"place_order": False} async def fake_place_order(*_: object, **__: object) -> object: called["place_order"] = True raise AssertionError("place_order should not be called for dry-run") - def fake_check_order(*_: object, **__: object) -> RiskCheckResult: - return RiskCheckResult(ok=True, reasons=[], details={"notional": 200.0}) - - async def fake_log_risk_event(*_: object, **__: object) -> None: - return None - - monkeypatch.setattr(server._orders, "_risk_context", fake_risk_context) # noqa: SLF001 monkeypatch.setattr(server._orders, "place_order", fake_place_order) # noqa: SLF001 - monkeypatch.setattr(server._risk, "check_order", fake_check_order) # noqa: SLF001 - monkeypatch.setattr(server._audit, "log_risk_event", fake_log_risk_event) # noqa: SLF001 req = Request( command="order.place", @@ -168,8 +154,7 @@ async def fake_log_risk_event(*_: object, **__: object) -> None: assert called["place_order"] is False assert data["dry_run"] is True - assert data["submit_allowed"] is True - assert data["order"]["status"].startswith("DryRun") + assert data["order"]["status"] == "DryRunPreview" @pytest.mark.asyncio diff --git a/daemon/tests/test_daemon/test_load_order_manager.py b/daemon/tests/test_daemon/test_load_order_manager.py index 2e937bf..4143766 100644 --- a/daemon/tests/test_daemon/test_load_order_manager.py +++ b/daemon/tests/test_daemon/test_load_order_manager.py @@ -5,12 +5,8 @@ import pytest -from broker_daemon.config import RiskConfig from broker_daemon.daemon.order_manager import OrderManager -from broker_daemon.exceptions import ErrorCode, BrokerError from broker_daemon.models.orders import OrderRequest -from broker_daemon.models.portfolio import Balance, PnLSummary -from broker_daemon.risk.engine import RiskEngine class _FakeConnection: @@ -18,7 +14,8 @@ def __init__(self) -> None: self.place_calls = 0 self._order_id = 1000 - async def balance(self) -> Balance: + async def balance(self) -> Any: + from broker_daemon.models.portfolio import Balance return Balance(net_liquidation=1_000_000) async def positions(self) -> list[Any]: @@ -27,7 +24,8 @@ async def positions(self) -> list[Any]: async def quote(self, _symbols: list[str]) -> list[Any]: return [] - async def pnl(self) -> PnLSummary: + async def pnl(self) -> Any: + from broker_daemon.models.portfolio import PnLSummary return PnLSummary(total=0.0) async def place_order(self, _request: OrderRequest, _client_order_id: str) -> dict[str, Any]: @@ -52,32 +50,19 @@ class _FakeAudit: async def upsert_order(self, _record: Any) -> None: return None - async def log_risk_event(self, _event_type: str, _details: dict[str, Any]) -> None: - return None - async def log_fill(self, _fill: Any) -> None: return None -async def _new_manager(order_rate_limit: int = 500) -> tuple[OrderManager, _FakeConnection]: +async def _new_manager() -> tuple[OrderManager, _FakeConnection]: conn = _FakeConnection() - risk = RiskEngine( - RiskConfig( - max_position_pct=100, - max_single_name_pct=100, - max_order_value=1_000_000, - max_open_orders=10_000, - order_rate_limit=order_rate_limit, - duplicate_window_seconds=1, - ) - ) - manager = OrderManager(provider=conn, risk=risk, audit=_FakeAudit(), event_cb=None) + manager = OrderManager(provider=conn, audit=_FakeAudit(), event_cb=None) return manager, conn @pytest.mark.asyncio async def test_rapid_order_submission_accepts_large_burst() -> None: - manager, conn = await _new_manager(order_rate_limit=500) + manager, conn = await _new_manager() async def _submit(i: int) -> None: req = OrderRequest( @@ -94,27 +79,3 @@ async def _submit(i: int) -> None: rows = await manager.list_orders(status="all") assert conn.place_calls == 120 assert len(rows) == 120 - - -@pytest.mark.asyncio -async def test_rapid_order_submission_triggers_rate_limit() -> None: - manager, _ = await _new_manager(order_rate_limit=5) - - async def _submit(i: int) -> Exception | None: - req = OrderRequest( - side="buy", - symbol="AAPL", - qty=float(i + 1), - limit=100.0, - client_order_id=f"rl-{i}", - ) - try: - await manager.place_order(req) - return None - except Exception as exc: # noqa: BLE001 - return exc - - outcomes = await asyncio.gather(*[_submit(i) for i in range(30)]) - failures = [out for out in outcomes if isinstance(out, Exception)] - assert failures, "expected rate limiting failures under burst load" - assert any(isinstance(err, BrokerError) and err.code == ErrorCode.RATE_LIMITED for err in failures) diff --git a/daemon/tests/test_risk/test_engine.py b/daemon/tests/test_risk/test_engine.py deleted file mode 100644 index 66b62d5..0000000 --- a/daemon/tests/test_risk/test_engine.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import annotations - -from broker_daemon.config import RiskConfig -from broker_daemon.models.orders import OrderRequest -from broker_daemon.risk.engine import RiskContext, RiskEngine - - -def test_order_value_limit_blocks_large_order() -> None: - engine = RiskEngine(RiskConfig(max_order_value=5000)) - order = OrderRequest(side="buy", symbol="AAPL", qty=100, limit=100) - result = engine.check_order(order, RiskContext(nlv=100_000, daily_pnl=0)) - - assert result.ok is False - assert any("max_order_value" in reason for reason in result.reasons) - - -def test_rate_limit_and_duplicate_detection() -> None: - engine = RiskEngine(RiskConfig(order_rate_limit=1, duplicate_window_seconds=60)) - ctx = RiskContext(nlv=100_000) - first = OrderRequest(side="buy", symbol="MSFT", qty=10, limit=100) - second = OrderRequest(side="buy", symbol="MSFT", qty=10, limit=100) - - first_result = engine.check_order(first, ctx) - second_result = engine.check_order(second, ctx) - - assert first_result.ok is True - assert second_result.ok is False - assert any("rate limit" in reason or "duplicate" in reason for reason in second_result.reasons) - - -def test_override_changes_effective_limit() -> None: - engine = RiskEngine(RiskConfig(max_order_value=1000)) - engine.override_limit("max_order_value", 5000, duration_seconds=3600, reason="manual") - snapshot = engine.snapshot() - - assert snapshot.max_order_value == 5000 diff --git a/daemon/tests/test_risk/test_error_codes.py b/daemon/tests/test_risk/test_error_codes.py deleted file mode 100644 index 5532261..0000000 --- a/daemon/tests/test_risk/test_error_codes.py +++ /dev/null @@ -1,35 +0,0 @@ -from __future__ import annotations - -import pytest - -from broker_daemon.config import RiskConfig -from broker_daemon.exceptions import ErrorCode, BrokerError -from broker_daemon.models.orders import OrderRequest -from broker_daemon.risk.engine import RiskContext, RiskEngine - - -def test_assert_order_raises_rate_limited_code() -> None: - engine = RiskEngine(RiskConfig(order_rate_limit=1, duplicate_window_seconds=1)) - ctx = RiskContext(nlv=100_000) - first = OrderRequest(side="buy", symbol="AAPL", qty=1, limit=100) - second = OrderRequest(side="buy", symbol="MSFT", qty=1, limit=100) - - engine.assert_order(first, ctx) - with pytest.raises(BrokerError) as exc: - engine.assert_order(second, ctx) - - assert exc.value.code == ErrorCode.RATE_LIMITED - assert ErrorCode.RATE_LIMITED.value in exc.value.details.get("violation_codes", []) - - -def test_assert_order_raises_duplicate_code() -> None: - engine = RiskEngine(RiskConfig(order_rate_limit=100, duplicate_window_seconds=60)) - ctx = RiskContext(nlv=100_000) - order = OrderRequest(side="buy", symbol="AAPL", qty=5, limit=100) - - engine.assert_order(order, ctx) - with pytest.raises(BrokerError) as exc: - engine.assert_order(order, ctx) - - assert exc.value.code == ErrorCode.DUPLICATE_ORDER - assert ErrorCode.DUPLICATE_ORDER.value in exc.value.details.get("violation_codes", []) diff --git a/daemon/tests/test_risk/test_monitors.py b/daemon/tests/test_risk/test_monitors.py deleted file mode 100644 index 9908c19..0000000 --- a/daemon/tests/test_risk/test_monitors.py +++ /dev/null @@ -1,27 +0,0 @@ -from __future__ import annotations - -from datetime import UTC, datetime, timedelta - -from broker_daemon.risk.monitor import ConnectionLossMonitor, HeartbeatMonitor - - -def test_heartbeat_monitor_timeout_behavior() -> None: - monitor = HeartbeatMonitor(timeout_seconds=10) - assert monitor.is_timed_out() is False - - monitor.beat() - assert monitor.is_timed_out() is False - monitor._last_heartbeat = datetime.now(UTC) - timedelta(seconds=11) # noqa: SLF001 - assert monitor.is_timed_out() is True - - -def test_connection_loss_monitor_breach_behavior() -> None: - monitor = ConnectionLossMonitor(threshold_seconds=30) - assert monitor.breached() is False - - monitor.on_disconnected() - monitor._disconnected_at = datetime.now(UTC) - timedelta(seconds=31) # noqa: SLF001 - assert monitor.breached() is True - - monitor.on_connected() - assert monitor.breached() is False diff --git a/scripts/load_test_orders.py b/scripts/load_test_orders.py index dfa602f..86a49f3 100755 --- a/scripts/load_test_orders.py +++ b/scripts/load_test_orders.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Burst load test harness for broker-daemon order/risk endpoints.""" +"""Burst load test harness for broker-daemon order endpoints.""" from __future__ import annotations @@ -35,13 +35,7 @@ def _parse_args() -> argparse.Namespace: parser.add_argument("--concurrency", type=int, default=20, help="Concurrent in-flight requests") parser.add_argument("--symbol", type=str, default="AAPL", help="Ticker symbol") parser.add_argument("--qty", type=float, default=1.0, help="Base quantity") - parser.add_argument("--limit", type=float, default=100.0, help="Limit price for order/risk-check payload") - parser.add_argument( - "--mode", - choices=["risk-check", "order"], - default="risk-check", - help="risk-check is safer; order submits real/paper orders through daemon", - ) + parser.add_argument("--limit", type=float, default=100.0, help="Limit price for order payload") parser.add_argument("--json", action="store_true", help="Emit summary as JSON") return parser.parse_args() @@ -49,23 +43,14 @@ def _parse_args() -> argparse.Namespace: async def _run_once(client: Client, args: argparse.Namespace, index: int) -> RunResult: started = time.perf_counter() try: - if args.mode == "risk-check": - await client.risk_check( - side="buy", - symbol=args.symbol, - qty=args.qty + (index % 3) * 0.01, - limit=args.limit, - tif="DAY", - ) - else: - await client.order( - side="buy", - symbol=args.symbol, - qty=args.qty + (index % 3) * 0.01, - limit=args.limit, - tif="DAY", - client_order_id=f"load-test-{index}", - ) + await client.order( + side="buy", + symbol=args.symbol, + qty=args.qty + (index % 3) * 0.01, + limit=args.limit, + tif="DAY", + client_order_id=f"load-test-{index}", + ) return RunResult(ok=True, latency_ms=(time.perf_counter() - started) * 1000.0) except BrokerError as exc: return RunResult( @@ -99,7 +84,6 @@ async def _worker(i: int) -> RunResult: error_counts[code] = error_counts.get(code, 0) + 1 return { - "mode": args.mode, "count": args.count, "concurrency": args.concurrency, "success": len(successes), @@ -133,7 +117,7 @@ def main() -> None: print(json.dumps(summary, separators=(",", ":"))) return - print(f"mode={summary['mode']} count={summary['count']} concurrency={summary['concurrency']}") + print(f"count={summary['count']} concurrency={summary['concurrency']}") print(f"success={summary['success']} failed={summary['failed']} throughput_rps={summary['throughput_rps']}") print( "latency_ms " diff --git a/sdk/python/src/broker_sdk/__init__.py b/sdk/python/src/broker_sdk/__init__.py index bb58cb3..b1c007a 100644 --- a/sdk/python/src/broker_sdk/__init__.py +++ b/sdk/python/src/broker_sdk/__init__.py @@ -24,8 +24,6 @@ OrderStatusFilter, ORDER_SIDES, ORDER_STATUS_FILTERS, - RiskParam, - RISK_PARAMS, TimeInForce, TIME_IN_FORCE_VALUES, ) @@ -54,8 +52,6 @@ "CHAIN_FIELDS", "ORDER_SIDES", "ORDER_STATUS_FILTERS", - "RiskParam", - "RISK_PARAMS", "TimeInForce", "TIME_IN_FORCE_VALUES", ] diff --git a/sdk/python/src/broker_sdk/client.py b/sdk/python/src/broker_sdk/client.py index 71c4a12..a2ba8cb 100644 --- a/sdk/python/src/broker_sdk/client.py +++ b/sdk/python/src/broker_sdk/client.py @@ -24,7 +24,6 @@ OrderSide, OrderStatusFilter, QuoteIntent, - RiskParam, TimeInForce, ) @@ -32,7 +31,7 @@ class Client: """Async broker client over the daemon Unix socket. - All operations route through `broker-daemon`, so risk checks and audit logging are always enforced. + All operations route through `broker-daemon`, so audit logging is always enforced. """ def __init__(self, socket_path: str | Path | None = None, timeout_seconds: int | None = None) -> None: @@ -325,46 +324,6 @@ async def balance(self) -> dict[str, Any]: async def exposure(self, by: ExposureGroupBy = "symbol") -> dict[str, Any]: return await self._request("portfolio.exposure", {"by": by}) - async def risk_check( - self, - *, - side: OrderSide, - symbol: str, - qty: float, - limit: float | None = None, - stop: float | None = None, - tif: TimeInForce = "DAY", - ) -> dict[str, Any]: - params: dict[str, Any] = { - "side": side, - "symbol": symbol, - "qty": qty, - "tif": tif, - } - if limit is not None: - params["limit"] = limit - if stop is not None: - params["stop"] = stop - return await self._request("risk.check", params) - - async def risk_limits(self) -> dict[str, Any]: - return await self._request("risk.limits") - - async def risk_set(self, param: RiskParam, value: Any) -> dict[str, Any]: - return await self._request("risk.set", {"param": param, "value": value}) - - async def risk_halt(self) -> dict[str, Any]: - return await self._request("risk.halt") - - async def risk_resume(self) -> dict[str, Any]: - return await self._request("risk.resume") - - async def risk_override(self, *, param: RiskParam, value: Any, duration: str, reason: str) -> dict[str, Any]: - return await self._request( - "risk.override", - {"param": param, "value": value, "duration": duration, "reason": reason}, - ) - async def keepalive(self) -> dict[str, Any]: return await self._request("runtime.keepalive", {"sent_at": time.time()}) @@ -391,12 +350,6 @@ async def audit_orders(self, status: OrderStatusFilter | None = None, since: str params["since"] = since return await self._request("audit.orders", params) - async def audit_risk(self, event_type: str | None = None) -> dict[str, Any]: - params: dict[str, Any] = {} - if event_type: - params["type"] = event_type - return await self._request("audit.risk", params) - async def audit_export( self, *, diff --git a/sdk/python/src/broker_sdk/types.py b/sdk/python/src/broker_sdk/types.py index 0c82cab..4346b6f 100644 --- a/sdk/python/src/broker_sdk/types.py +++ b/sdk/python/src/broker_sdk/types.py @@ -13,21 +13,9 @@ QUOTE_INTENTS = ("best_effort", "top_of_book", "last_only") ORDER_STATUS_FILTERS = ("active", "filled", "cancelled", "all") EXPOSURE_GROUPS = ("sector", "asset_class", "currency", "symbol") -EVENT_TOPICS = ("orders", "fills", "positions", "pnl", "risk", "connection") -AUDIT_TABLES = ("orders", "commands", "risk") +EVENT_TOPICS = ("orders", "fills", "positions", "pnl", "connection") +AUDIT_TABLES = ("orders", "commands") AUDIT_SOURCES = ("cli", "sdk", "ts_sdk") -RISK_PARAMS = ( - "max_position_pct", - "max_order_value", - "max_daily_loss_pct", - "max_sector_exposure_pct", - "max_single_name_pct", - "max_open_orders", - "order_rate_limit", - "duplicate_window_seconds", - "symbol_allowlist", - "symbol_blocklist", -) OrderSide: TypeAlias = Literal["buy", "sell"] TimeInForce: TypeAlias = Literal["DAY", "GTC", "IOC"] @@ -38,18 +26,6 @@ QuoteIntent: TypeAlias = Literal["best_effort", "top_of_book", "last_only"] OrderStatusFilter: TypeAlias = Literal["active", "filled", "cancelled", "all"] ExposureGroupBy: TypeAlias = Literal["sector", "asset_class", "currency", "symbol"] -EventTopic: TypeAlias = Literal["orders", "fills", "positions", "pnl", "risk", "connection"] -AuditTable: TypeAlias = Literal["orders", "commands", "risk"] +EventTopic: TypeAlias = Literal["orders", "fills", "positions", "pnl", "connection"] +AuditTable: TypeAlias = Literal["orders", "commands"] AuditSource: TypeAlias = Literal["cli", "sdk", "ts_sdk"] -RiskParam: TypeAlias = Literal[ - "max_position_pct", - "max_order_value", - "max_daily_loss_pct", - "max_sector_exposure_pct", - "max_single_name_pct", - "max_open_orders", - "order_rate_limit", - "duplicate_window_seconds", - "symbol_allowlist", - "symbol_blocklist", -] diff --git a/sdk/python/tests/test_sdk/test_client_response.py b/sdk/python/tests/test_sdk/test_client_response.py index eb4295c..68ce2be 100644 --- a/sdk/python/tests/test_sdk/test_client_response.py +++ b/sdk/python/tests/test_sdk/test_client_response.py @@ -4,7 +4,7 @@ from broker_daemon.exceptions import ErrorCode, BrokerError from broker_daemon.protocol import ErrorResponse, Response -from broker_sdk import EVENT_TOPICS, RISK_PARAMS +from broker_sdk import EVENT_TOPICS from broker_sdk.client import _unwrap_response @@ -26,4 +26,3 @@ def test_unwrap_error() -> None: def test_exported_constants_are_non_empty() -> None: assert len(EVENT_TOPICS) > 0 - assert "max_order_value" in RISK_PARAMS diff --git a/sdk/typescript/src/client.ts b/sdk/typescript/src/client.ts index 463bb2f..ded4c49 100644 --- a/sdk/typescript/src/client.ts +++ b/sdk/typescript/src/client.ts @@ -24,7 +24,6 @@ import type { AuditCommandsResponse, AuditExportResponse, AuditOrdersResponse, - AuditRiskResponse, BarSize, BracketInput, DaemonStatusResponse, @@ -51,15 +50,7 @@ import type { PortfolioPositionsResponse, PortfolioSnapshotResponse, QuoteIntent, - QuoteSnapshotResponse, - RiskCheckInput, - RiskCheckResult, - RiskHaltResponse, - RiskLimitsResponse, - RiskParam, - RiskOverrideResponse, - RiskResumeResponse, - RiskSetResponse + QuoteSnapshotResponse } from "./sdk-types.js"; export interface ClientOptions { @@ -329,52 +320,6 @@ export class Client { return this.request("fills.list", params); } - async riskCheck(input: RiskCheckInput): Promise { - const params: CommandParams<"risk.check"> = { - side: input.side, - symbol: input.symbol, - qty: input.qty, - tif: input.tif ?? "DAY" - }; - if (input.limit !== undefined) { - params.limit = input.limit; - } - if (input.stop !== undefined) { - params.stop = input.stop; - } - return this.request("risk.check", params); - } - - async riskLimits(): Promise { - return this.request("risk.limits", {}); - } - - async riskSet(param: RiskParam, value: JsonValue): Promise { - return this.request("risk.set", { param, value }); - } - - async riskHalt(): Promise { - return this.request("risk.halt", {}); - } - - async riskResume(): Promise { - return this.request("risk.resume", {}); - } - - async riskOverride(input: { - param: RiskParam; - value: JsonValue; - duration: string; - reason: string; - }): Promise { - return this.request("risk.override", { - param: input.param, - value: input.value, - duration: input.duration, - reason: input.reason - }); - } - async keepalive(sentAt?: number): Promise { return this.request("runtime.keepalive", { sent_at: sentAt ?? Date.now() / 1000 @@ -406,14 +351,6 @@ export class Client { return this.request("audit.orders", params); } - async auditRisk(type?: string): Promise { - const params: CommandParams<"audit.risk"> = {}; - if (type) { - params.type = type; - } - return this.request("audit.risk", params); - } - async auditExport(input: { output: string; table?: AuditTable; diff --git a/sdk/typescript/src/commands.ts b/sdk/typescript/src/commands.ts index 42db551..b5818ea 100644 --- a/sdk/typescript/src/commands.ts +++ b/sdk/typescript/src/commands.ts @@ -7,7 +7,6 @@ import type { AuditExportResponse, AuditTable, AuditOrdersResponse, - AuditRiskResponse, BarSize, DaemonStatusResponse, DaemonStopResponse, @@ -34,14 +33,7 @@ import type { PortfolioSnapshotResponse, QuoteSnapshotResponse, QuoteIntent, - RiskParam, - RiskCheckResult, - RiskHaltResponse, - RiskLimitsResponse, - RiskOverrideResponse, - RiskResumeResponse, - TimeInForce, - RiskSetResponse + TimeInForce } from "./sdk-types.js"; export interface CommandSpec { @@ -119,34 +111,9 @@ export interface CommandMap { "order.cancel": CommandSpec<{ order_id: string }, OrderCancelResponse>; "orders.cancel_all": CommandSpec<{ confirm?: boolean; json_mode?: boolean }, OrdersCancelAllResponse>; "fills.list": CommandSpec<{ since?: string; symbol?: string }, FillsListResponse>; - "risk.check": CommandSpec< - { - side: OrderSide; - symbol: string; - qty: number; - tif?: TimeInForce; - limit?: number; - stop?: number; - }, - RiskCheckResult - >; - "risk.limits": CommandSpec, RiskLimitsResponse>; - "risk.set": CommandSpec<{ param: RiskParam; value: JsonValue }, RiskSetResponse>; - "risk.halt": CommandSpec, RiskHaltResponse>; - "risk.resume": CommandSpec, RiskResumeResponse>; - "risk.override": CommandSpec< - { - param: RiskParam; - value: JsonValue; - duration: string; - reason: string; - }, - RiskOverrideResponse - >; "runtime.keepalive": CommandSpec<{ sent_at?: number }, KeepaliveResponse>; "audit.commands": CommandSpec<{ source?: AuditSource; since?: string; request_id?: string }, AuditCommandsResponse>; "audit.orders": CommandSpec<{ status?: OrderStatusFilter; since?: string }, AuditOrdersResponse>; - "audit.risk": CommandSpec<{ type?: string }, AuditRiskResponse>; "audit.export": CommandSpec< { output: string; diff --git a/sdk/typescript/src/config.ts b/sdk/typescript/src/config.ts index 48b4bc1..13578bf 100644 --- a/sdk/typescript/src/config.ts +++ b/sdk/typescript/src/config.ts @@ -30,18 +30,6 @@ const DEFAULT_CONFIG: AppConfig = { auto_reconnect: true, reconnect_backoff_max: 30 }, - risk: { - max_position_pct: 10.0, - max_order_value: 50000, - max_daily_loss_pct: 2.0, - max_sector_exposure_pct: 30.0, - max_single_name_pct: 10.0, - max_open_orders: 20, - order_rate_limit: 10, - duplicate_window_seconds: 60, - symbol_allowlist: [], - symbol_blocklist: [] - }, logging: { level: "INFO", audit_db: path.join(DEFAULT_STATE_HOME, "audit.db"), @@ -125,7 +113,6 @@ function mergeSection(target: T, source: Partial | undefine function normalizeConfig(raw: Partial): AppConfig { const cfg = cloneDefaults(); cfg.gateway = mergeSection(cfg.gateway, raw.gateway); - cfg.risk = mergeSection(cfg.risk, raw.risk); cfg.logging = mergeSection(cfg.logging, raw.logging); cfg.agent = mergeSection(cfg.agent, raw.agent); cfg.output = mergeSection(cfg.output, raw.output); @@ -141,7 +128,7 @@ function normalizeConfig(raw: Partial): AppConfig { function applyEnvOverrides(base: AppConfig): AppConfig { const out = structuredClone(base); - const sections = new Set(["gateway", "risk", "logging", "agent", "output", "runtime"]); + const sections = new Set(["gateway", "logging", "agent", "output", "runtime"]); for (const [key, raw] of Object.entries(process.env)) { if (!key.startsWith("BROKER_") || raw === undefined) { @@ -197,9 +184,6 @@ function extractBrokerConfig(raw: unknown): Partial { if (isRecord(broker.gateway)) { out.gateway = broker.gateway as unknown as AppConfig["gateway"]; } - if (isRecord(broker.risk)) { - out.risk = broker.risk as unknown as AppConfig["risk"]; - } if (isRecord(broker.logging)) { out.logging = broker.logging as unknown as AppConfig["logging"]; } diff --git a/sdk/typescript/src/errors.ts b/sdk/typescript/src/errors.ts index a1ec95e..a9f9d81 100644 --- a/sdk/typescript/src/errors.ts +++ b/sdk/typescript/src/errors.ts @@ -4,10 +4,7 @@ export enum ErrorCode { DAEMON_NOT_RUNNING = "DAEMON_NOT_RUNNING", IB_DISCONNECTED = "IB_DISCONNECTED", IB_REJECTED = "IB_REJECTED", - RISK_CHECK_FAILED = "RISK_CHECK_FAILED", - RISK_HALTED = "RISK_HALTED", RATE_LIMITED = "RATE_LIMITED", - DUPLICATE_ORDER = "DUPLICATE_ORDER", INVALID_SYMBOL = "INVALID_SYMBOL", INVALID_ARGS = "INVALID_ARGS", TIMEOUT = "TIMEOUT", @@ -18,8 +15,6 @@ export const EXIT_CODE_BY_ERROR: Record = { [ErrorCode.INVALID_ARGS]: 2, [ErrorCode.DAEMON_NOT_RUNNING]: 3, [ErrorCode.IB_DISCONNECTED]: 4, - [ErrorCode.RISK_CHECK_FAILED]: 5, - [ErrorCode.RISK_HALTED]: 6, [ErrorCode.TIMEOUT]: 10 }; diff --git a/sdk/typescript/src/index.ts b/sdk/typescript/src/index.ts index 2451321..b54b6b1 100644 --- a/sdk/typescript/src/index.ts +++ b/sdk/typescript/src/index.ts @@ -14,7 +14,6 @@ export { QUOTE_INTENTS, ORDER_SIDES, ORDER_STATUS_FILTERS, - RISK_PARAMS, TIME_IN_FORCE_VALUES } from "./sdk-types.js"; @@ -38,7 +37,6 @@ export type { OutputConfig, RequestEnvelope, ResponseEnvelope, - RiskConfig, RuntimeConfig } from "./types.js"; @@ -52,8 +50,6 @@ export type { AuditExportResponse, AuditOrdersResponse, AuditOrdersRow, - AuditRiskResponse, - AuditRiskRow, Balance, BarSize, Bar, @@ -97,15 +93,5 @@ export type { Quote, QuoteSnapshotResponse, MarketCapabilitiesResponse, - RiskParam, - RiskCheckInput, - RiskCheckResult, - RiskConfigSnapshot, - RiskHaltResponse, - RiskLimitsResponse, - RiskOverride, - RiskOverrideResponse, - RiskResumeResponse, - RiskSetResponse, TimeInForce } from "./sdk-types.js"; diff --git a/sdk/typescript/src/sdk-types.ts b/sdk/typescript/src/sdk-types.ts index 8f16dad..a244e00 100644 --- a/sdk/typescript/src/sdk-types.ts +++ b/sdk/typescript/src/sdk-types.ts @@ -9,21 +9,9 @@ export const CHAIN_FIELDS = ["symbol", "right", "strike", "expiry", "bid", "ask" export const QUOTE_INTENTS = ["best_effort", "top_of_book", "last_only"] as const; export const ORDER_STATUS_FILTERS = ["active", "filled", "cancelled", "all"] as const; export const EXPOSURE_GROUPS = ["sector", "asset_class", "currency", "symbol"] as const; -export const EVENT_TOPICS = ["orders", "fills", "positions", "pnl", "risk", "connection"] as const; -export const AUDIT_TABLES = ["orders", "commands", "risk"] as const; +export const EVENT_TOPICS = ["orders", "fills", "positions", "pnl", "connection"] as const; +export const AUDIT_TABLES = ["orders", "commands"] as const; export const AUDIT_SOURCES = ["cli", "sdk", "ts_sdk"] as const; -export const RISK_PARAMS = [ - "max_position_pct", - "max_order_value", - "max_daily_loss_pct", - "max_sector_exposure_pct", - "max_single_name_pct", - "max_open_orders", - "order_rate_limit", - "duplicate_window_seconds", - "symbol_allowlist", - "symbol_blocklist" -] as const; export type OrderSide = (typeof ORDER_SIDES)[number]; export type TimeInForce = (typeof TIME_IN_FORCE_VALUES)[number]; @@ -37,7 +25,6 @@ export type ExposureGroupBy = (typeof EXPOSURE_GROUPS)[number]; export type EventTopic = (typeof EVENT_TOPICS)[number]; export type AuditTable = (typeof AUDIT_TABLES)[number]; export type AuditSource = (typeof AUDIT_SOURCES)[number]; -export type RiskParam = (typeof RISK_PARAMS)[number]; export interface Quote { symbol: string; @@ -175,7 +162,6 @@ export interface OrderRecord { fill_price: number | null; fill_qty: number; commission: number | null; - risk_check_result: Record; } export interface FillRecord { @@ -191,35 +177,6 @@ export interface FillRecord { decision_id?: string | null; } -export interface RiskCheckResult { - ok: boolean; - reasons: string[]; - details: Record; - suggestion?: string | null; -} - -export interface RiskConfigSnapshot { - max_position_pct: number; - max_order_value: number; - max_daily_loss_pct: number; - max_sector_exposure_pct: number; - max_single_name_pct: number; - max_open_orders: number; - order_rate_limit: number; - duplicate_window_seconds: number; - symbol_allowlist: string[]; - symbol_blocklist: string[]; - halted: boolean; -} - -export interface RiskOverride { - param: string; - value: number; - reason: string; - created_at: string; - expires_at: string; -} - export interface DaemonStatusResponse { uptime_seconds: number; connection: { @@ -233,7 +190,6 @@ export interface DaemonStatusResponse { last_error: string | null; }; provider_capabilities: Record; - risk_halted: boolean; time_sync_delta_ms: number | null; socket: string; } @@ -284,8 +240,6 @@ export interface PortfolioSnapshotResponse { pnl: PnLSummary; exposure: ExposureEntry[]; exposure_by: string; - risk_limits: RiskConfigSnapshot; - risk_halted: boolean; connection: DaemonStatusResponse["connection"]; provider_capabilities: ProviderQuoteCapabilities; provider_capabilities_cache: CapabilityCacheMeta; @@ -294,8 +248,6 @@ export interface PortfolioSnapshotResponse { export interface OrderPlaceResponse { order: OrderRecord; dry_run: boolean; - risk_check: RiskCheckResult; - submit_allowed: boolean; } export interface OrderBracketResponse { @@ -326,31 +278,10 @@ export interface FillsListResponse { fills: FillRecord[]; } -export interface RiskLimitsResponse { - limits: RiskConfigSnapshot; -} - -export interface RiskSetResponse { - limits: RiskConfigSnapshot; -} - -export interface RiskHaltResponse { - halted: boolean; -} - -export interface RiskResumeResponse { - halted: boolean; -} - -export interface RiskOverrideResponse { - override: RiskOverride; -} - export interface KeepaliveResponse { ok: boolean; latency_ms: number | null; connected: boolean; - halted: boolean; } export interface AuditCommandsRow { @@ -379,13 +310,6 @@ export interface AuditOrdersRow { fill_price: number | null; fill_qty: number | null; commission: number | null; - risk_check_result: string; -} - -export interface AuditRiskRow { - timestamp: string; - event_type: string; - details: string; } export interface AuditCommandsResponse { @@ -396,10 +320,6 @@ export interface AuditOrdersResponse { orders: AuditOrdersRow[]; } -export interface AuditRiskResponse { - risk_events: AuditRiskRow[]; -} - export interface AuditExportResponse { output: string; rows: number; @@ -439,11 +359,3 @@ export interface BracketInput { decision_reasoning?: string; } -export interface RiskCheckInput { - side: OrderSide; - symbol: string; - qty: number; - limit?: number; - stop?: number; - tif?: TimeInForce; -} diff --git a/sdk/typescript/src/types.ts b/sdk/typescript/src/types.ts index b5abdb9..0aaf6c6 100644 --- a/sdk/typescript/src/types.ts +++ b/sdk/typescript/src/types.ts @@ -40,19 +40,6 @@ export interface GatewayConfig { reconnect_backoff_max: number; } -export interface RiskConfig { - max_position_pct: number; - max_order_value: number; - max_daily_loss_pct: number; - max_sector_exposure_pct: number; - max_single_name_pct: number; - max_open_orders: number; - order_rate_limit: number; - duplicate_window_seconds: number; - symbol_allowlist: string[]; - symbol_blocklist: string[]; -} - export interface LoggingConfig { level: string; audit_db: string; @@ -79,7 +66,6 @@ export interface RuntimeConfig { export interface AppConfig { gateway: GatewayConfig; - risk: RiskConfig; logging: LoggingConfig; agent: AgentConfig; output: OutputConfig; diff --git a/skills/broker/SKILL.md b/skills/broker/SKILL.md index bb1683e..cd0eff8 100644 --- a/skills/broker/SKILL.md +++ b/skills/broker/SKILL.md @@ -45,23 +45,6 @@ Every order submission (`buy`, `sell`, `bracket`) requires three decision flags. These are stored in the fund observability repo as decision records. Write substantive reasoning — it becomes the audit trail for every trade. -## Risk Limit Defaults - -These are the default risk limits. Query live values with `broker limits`. - -| Parameter | Default | Description | -|---|---|---| -| `max_position_pct` | 10.0 | Max % of NLV in one position | -| `max_order_value` | 50000 | Max dollar value per order | -| `max_daily_loss_pct` | 2.0 | Max daily loss as % of NLV | -| `max_sector_exposure_pct` | 30.0 | Max % exposure to one sector | -| `max_single_name_pct` | 10.0 | Max % of NLV in a single name | -| `max_open_orders` | 20 | Max simultaneous open orders | -| `order_rate_limit` | 10 | Max orders per minute | -| `duplicate_window_seconds` | 60 | Window for duplicate order detection | -| `symbol_allowlist` | _(empty = all allowed)_ | Only these symbols are tradeable | -| `symbol_blocklist` | _(empty = none blocked)_ | These symbols are blocked | - ## Workflows ### Research a trade @@ -71,13 +54,12 @@ broker quote AAPL broker history AAPL --period 30d --bar 1d broker chain AAPL --expiry 2026-03 --type call broker snapshot --symbols AAPL -broker check --side buy --symbol AAPL --qty 50 --limit 180 ``` ### Dry-run then place an order ```bash -# Test against risk limits without submitting +# Preview the order without submitting broker order buy AAPL 50 --limit 180 --dry-run \ --decision-name "AAPL Accumulation" \ --decision-summary "Adding to AAPL on pullback to 180 support" \ @@ -117,16 +99,6 @@ broker exposure --by sector broker pnl --today ``` -### Risk management - -```bash -broker limits # view current limits -broker set max_order_value 25000 # tighten a limit -broker override --param max_order_value --value 75000 --duration 1h --reason "large block trade" -broker halt # emergency: cancel all + reject new orders -broker resume # re-enable trading after halt -``` - --- ## Command Reference @@ -212,7 +184,7 @@ broker order buy SYMBOL QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] \ - `QTY`: must be > 0 (supports fractional) - `--tif`: default `DAY` -- `--dry-run`: evaluate against risk only, do not submit +- `--dry-run`: preview the order without submitting - `--idempotency-key`: stable key for safe retries (maps to `client_order_id`) - Decision flags are **required** for submitted orders (not required for `--dry-run`) @@ -323,66 +295,6 @@ broker exposure [--by symbol|sector|asset_class|currency] --- -### Risk - -#### `broker check` - -Dry-run an order against risk limits (no submission). - -```bash -broker check --side buy|sell --symbol SYMBOL --qty QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] -``` - -- `--side`, `--symbol`, `--qty` are required -- `--tif`: default `DAY` - -#### `broker limits` - -Show current runtime risk limits. - -```bash -broker limits -``` - -#### `broker set` - -Set a risk parameter at runtime. - -```bash -broker set PARAM VALUE -``` - -Valid params: `max_position_pct`, `max_order_value`, `max_daily_loss_pct`, `max_sector_exposure_pct`, `max_single_name_pct`, `max_open_orders`, `order_rate_limit`, `duplicate_window_seconds`, `symbol_allowlist`, `symbol_blocklist` - -#### `broker halt` - -Emergency halt — cancels all open orders and rejects new orders. - -```bash -broker halt -``` - -#### `broker resume` - -Resume trading after halt. - -```bash -broker resume -``` - -#### `broker override` - -Temporary risk override with required reason and duration. - -```bash -broker override --param PARAM --value VALUE --duration DURATION --reason TEXT -``` - -- All flags required. -- `--duration`: `30m`, `1h`, `1d` (or raw seconds) - ---- - ### Audit #### `broker audit orders` @@ -397,17 +309,11 @@ broker audit orders [--since YYYY-MM-DD] [--status active|filled|cancelled|all] broker audit commands [--source cli|sdk|ts_sdk] [--since YYYY-MM-DD] [--request-id ID] ``` -#### `broker audit risk` - -```bash -broker audit risk [--type EVENT_TYPE] -``` - #### `broker audit export` ```bash -broker audit export --output PATH [--format csv] [--table orders|commands|risk] \ - [--since YYYY-MM-DD] [--status STATUS] [--source SOURCE] [--request-id ID] [--type EVENT_TYPE] +broker audit export --output PATH [--format csv] [--table orders|commands] \ + [--since YYYY-MM-DD] [--status STATUS] [--source SOURCE] [--request-id ID] ``` - `--output`: required @@ -438,10 +344,6 @@ broker schema [COMMAND] | `DAEMON_NOT_RUNNING` | 3 | Run `broker daemon start` | | `IB_DISCONNECTED` | 4 | Verify IB Gateway/TWS is running, wait and retry | | `IB_REJECTED` | 1 | Check order params (symbol, qty, price) | -| `RISK_CHECK_FAILED` | 5 | Check `broker limits`, adjust order size/price or use `broker override` | -| `RISK_HALTED` | 6 | Trading is halted — review situation, then `broker resume` | -| `RATE_LIMITED` | 1 | Wait and retry (default: 10 orders/min) | -| `DUPLICATE_ORDER` | 1 | Identical order within duplicate window — wait or use different `--idempotency-key` | | `INVALID_SYMBOL` | 1 | Check symbol with `broker quote` | | `INVALID_ARGS` | 2 | Check command syntax with `-h` | | `TIMEOUT` | 10 | Retry the command | diff --git a/start.sh b/start.sh index 4f7a4b8..dbe5180 100755 --- a/start.sh +++ b/start.sh @@ -751,7 +751,6 @@ if [[ -n "${STATUS_JSON}" ]]; then echo "IB is not reachable on localhost:${PORT}." echo "Start IB Gateway and enable API socket access, or pass --gateway HOST:PORT." echo "Common Gateway ports: paper/live = 4002/4001." - echo "If risk_halted=true after reconnect, run: ${BROKER_BIN} resume" fi else "${BROKER_BIN}" daemon status diff --git a/website/app/page.tsx b/website/app/page.tsx index 54bac1a..175270d 100644 --- a/website/app/page.tsx +++ b/website/app/page.tsx @@ -7,7 +7,6 @@ import { Lock, Shuffle, BarChart3, - ShieldCheck, Copy, Check, FlaskConical, @@ -119,12 +118,6 @@ const features = [ "Option chains with greeks, expiry filtering, and strike ranges. Agents can evaluate and execute complex derivatives strategies.", icon: BarChart3, }, - { - title: "Risk Guardrails", - description: - "Exposure analysis by symbol, sector, or asset class. Cancel-all for instant flattening. Paper trading mode for safe development. Give agents power with built-in safety valves.", - icon: ShieldCheck, - }, ]; export default function Home() { @@ -184,7 +177,7 @@ export default function Home() {

broker-cli{" "} closes that gap. Install it, and your agent can check positions, - analyze risk, place orders, and manage a portfolio using the same + analyze exposure, place orders, and manage a portfolio using the same interface it uses for everything else:{" "} the terminal.

@@ -264,7 +257,7 @@ export default function Home() { Start safe: broker daemon start --paper - — after broker setup, use full paper trading mode. Test strategies with zero risk, go live + — after broker setup, use full paper trading mode. Test strategies with simulated funds, go live when ready. diff --git a/website/app/reference/page.tsx b/website/app/reference/page.tsx index e954a9e..5502857 100644 --- a/website/app/reference/page.tsx +++ b/website/app/reference/page.tsx @@ -104,7 +104,6 @@ const sections = [ { id: "market", label: "Market Data" }, { id: "orders", label: "Orders" }, { id: "portfolio", label: "Portfolio" }, - { id: "risk", label: "Risk" }, { id: "audit", label: "Audit" }, { id: "config", label: "Configuration" }, ]; @@ -210,9 +209,9 @@ export default function ReferencePage() { /> - - - {/* Risk */} -
- - - - - -
@@ -484,7 +425,7 @@ export default function ReferencePage() {
-