From 6b21b8b23c6d23fa4b8673c65f922760fcfc093d Mon Sep 17 00:00:00 2001 From: Bryce Bjork Date: Fri, 20 Feb 2026 12:10:01 -0500 Subject: [PATCH] Add fund repo sync and decision metadata Implement setup-driven fund repository initialization, required decision fields on order commands, daemon-based fill/decision sync with git push, and FUND format docs for dashboard ingestion. Co-authored-by: Bryce Bjork --- FUND.md | 148 ++++++++ README.md | 11 +- cli/src/orders.py | 90 +++++ cli/tests/test_cli/test_command_contracts.py | 54 ++- daemon/src/broker_daemon/config.py | 23 +- .../src/broker_daemon/daemon/order_manager.py | 72 +++- daemon/src/broker_daemon/daemon/server.py | 150 ++++++++ daemon/src/broker_daemon/models/orders.py | 3 + .../broker_daemon/observability/__init__.py | 5 + .../broker_daemon/observability/fund_sync.py | 324 ++++++++++++++++++ daemon/src/broker_daemon/providers/etrade.py | 12 + daemon/src/broker_daemon/providers/ib.py | 25 +- .../tests/test_daemon/test_config_loader.py | 19 + daemon/tests/test_daemon/test_fund_sync.py | 117 +++++++ install/steps/onboarding.sh | 317 +++++++++++++++++ install/steps/summary.sh | 35 ++ sdk/python/src/broker_sdk/client.py | 40 ++- sdk/typescript/src/client.ts | 23 +- sdk/typescript/src/commands.ts | 6 + sdk/typescript/src/sdk-types.ts | 8 + setup.sh | 3 + skills/{ => broker}/SKILL.md | 32 +- 22 files changed, 1485 insertions(+), 32 deletions(-) create mode 100644 FUND.md create mode 100644 daemon/src/broker_daemon/observability/__init__.py create mode 100644 daemon/src/broker_daemon/observability/fund_sync.py create mode 100644 daemon/tests/test_daemon/test_fund_sync.py rename skills/{ => broker}/SKILL.md (85%) diff --git a/FUND.md b/FUND.md new file mode 100644 index 0000000..0cf9369 --- /dev/null +++ b/FUND.md @@ -0,0 +1,148 @@ +# Fund Repository Format + +This document defines the exact on-disk shape of a broker-cli fund observability repository. + +The broker setup flow can create this repository automatically, and broker-daemon keeps it updated during trading. + +## Directory Layout + +```text +/ + config.json + fills.json + cash_events.json + decisions/ + .md +``` + +Notes: +- `` is configured in broker config at `broker.observability.fund_dir`. +- Decision filenames are timestamp IDs (for example: `20260220T153012123456Z.md`). + +## `config.json` + +Fund metadata and initialization parameters. + +Example: + +```json +{ + "name": "Atlas Fund", + "slug": "atlas", + "inception": "2026-02-20T15:12:30Z", + "currency": "USD", + "initialCapital": 100000.0, + "benchmarks": [], + "cashInterestPolicy": { + "enabled": true, + "source": "inferred_from_broker_cash_balance" + } +} +``` + +Fields: +- `name: string` +- `slug: string` +- `inception: string` (ISO-8601 timestamp) +- `currency: "USD"` +- `initialCapital: number` +- `benchmarks: string[]` +- `cashInterestPolicy.enabled: boolean` +- `cashInterestPolicy.source: string` + +## `fills.json` + +Append-only array of executed fills. + +Example entry: + +```json +{ + "id": "fill-001", + "symbol": "NVDA", + "side": "buy", + "qty": 50, + "price": 480.25, + "commission": 1.0, + "timestamp": "2026-02-20T15:16:07.184321+00:00", + "decisionId": "20260220T151530112233Z" +} +``` + +Fields: +- `id: string` (unique fill identifier, dedup key) +- `symbol: string` +- `side: "buy" | "sell"` +- `qty: number` +- `price: number` +- `commission: number` +- `timestamp: string` (ISO-8601 timestamp) +- `decisionId: string | null` + +## `cash_events.json` + +Append-only array of cash adjustments not represented by trade notional directly. + +Current event type: +- `interest` (cash interest inferred from broker cash balance reconciliation) + +Example entry: + +```json +{ + "id": "interest-20260220T160001987654Z", + "type": "interest", + "amount": 3.42, + "timestamp": "2026-02-20T16:00:01.987654+00:00", + "source": "inferred_from_broker_cash_balance" +} +``` + +Fields: +- `id: string` +- `type: string` (currently `"interest"`) +- `amount: number` (positive or negative) +- `timestamp: string` (ISO-8601 timestamp) +- `source: string` + +## `decisions/.md` + +Markdown decision files with YAML frontmatter. + +Example: + +```markdown +--- +date: 2026-02-20 +type: buy +tickers: [NVDA] +title: "Initiate NVDA Position" +summary: "Started a core position after earnings revision." +--- + +## Thesis + +Long-form markdown reasoning content... +``` + +Frontmatter fields: +- `date: string` (ISO date) +- `type: "buy" | "sell"` +- `tickers: string[]` +- `title: string` +- `summary: string` + +Body: +- Free-form markdown reasoning (from `--decision-reasoning`). + +## Sync Behavior + +When observability sync is enabled: +- Decision files are written when orders are placed. +- Fills are appended on executions. +- Cash interest events are appended when inferred deltas are detected. +- Changes are committed and pushed to `origin` automatically. + +Git behavior assumptions: +- `` is a git repository. +- `origin` remote exists and is pushable from the runtime environment. diff --git a/README.md b/README.md index 5f208e4..1828c46 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,10 @@ broker daemon status # Check connection broker quote AAPL MSFT # Get quotes broker positions # View portfolio broker exposure --by symbol # Exposure analysis -broker order buy AAPL 100 --limit 185 # Place an order +broker order buy AAPL 100 --limit 185 \ + --decision-name "Initiate AAPL Position" \ + --decision-summary "Open core position" \ + --decision-reasoning "## Thesis\nHigh-conviction setup." # Place an order ``` ## Built for Agents @@ -73,9 +76,9 @@ broker quote SYMBOL... Snapshot quotes broker watch SYMBOL Live quote stream broker chain SYMBOL Option chain with greeks broker history SYMBOL Historical bars -broker order buy SYMBOL QTY Buy order (market/limit/stop) -broker order sell SYMBOL QTY Sell order -broker order bracket SYMBOL QTY Bracket order (entry + TP + SL) +broker order buy SYMBOL QTY Buy order (requires --decision-name/--decision-summary/--decision-reasoning) +broker order sell SYMBOL QTY Sell order (requires --decision-name/--decision-summary/--decision-reasoning) +broker order bracket SYMBOL QTY Bracket order (entry + TP + SL, requires decision flags) broker order status ORDER_ID Order status broker orders List orders broker cancel ORDER_ID Cancel an order diff --git a/cli/src/orders.py b/cli/src/orders.py index abb39d6..2f3dc3a 100644 --- a/cli/src/orders.py +++ b/cli/src/orders.py @@ -34,6 +34,21 @@ def buy( "--idempotency-key", help="Stable key for safe retries (maps to client_order_id).", ), + decision_name: str = typer.Option( + ..., + "--decision-name", + help="Required title-case plain text decision title.", + ), + decision_summary: str = typer.Option( + ..., + "--decision-summary", + help="Required single-line plain text summary.", + ), + decision_reasoning: str = typer.Option( + ..., + "--decision-reasoning", + help="Required long-form markdown reasoning.", + ), ) -> None: _place( ctx, @@ -45,6 +60,9 @@ def buy( tif=tif, dry_run=dry_run, idempotency_key=idempotency_key, + decision_name=decision_name, + decision_summary=decision_summary, + decision_reasoning=decision_reasoning, ) @@ -62,6 +80,21 @@ def sell( "--idempotency-key", help="Stable key for safe retries (maps to client_order_id).", ), + decision_name: str = typer.Option( + ..., + "--decision-name", + help="Required title-case plain text decision title.", + ), + decision_summary: str = typer.Option( + ..., + "--decision-summary", + help="Required single-line plain text summary.", + ), + decision_reasoning: str = typer.Option( + ..., + "--decision-reasoning", + help="Required long-form markdown reasoning.", + ), ) -> None: _place( ctx, @@ -73,6 +106,9 @@ def sell( tif=tif, dry_run=dry_run, idempotency_key=idempotency_key, + decision_name=decision_name, + decision_summary=decision_summary, + decision_reasoning=decision_reasoning, ) @@ -86,9 +122,27 @@ def bracket( sl: float = typer.Option(..., "--sl", help="Stop-loss price."), side: Side = typer.Option(Side.BUY, "--side", case_sensitive=False, help="buy or sell."), tif: TIF = typer.Option(TIF.DAY, "--tif", case_sensitive=False, help="DAY, GTC, IOC."), + decision_name: str = typer.Option( + ..., + "--decision-name", + help="Required title-case plain text decision title.", + ), + decision_summary: str = typer.Option( + ..., + "--decision-summary", + help="Required single-line plain text summary.", + ), + decision_reasoning: str = typer.Option( + ..., + "--decision-reasoning", + help="Required long-form markdown reasoning.", + ), ) -> None: state = get_state(ctx) command = "order.bracket" + decision_name = _normalize_decision_name(decision_name) + decision_summary = _normalize_single_line(decision_summary, "decision summary") + decision_reasoning = _normalize_required_text(decision_reasoning, "decision reasoning") try: result = run_async( daemon_request( @@ -102,6 +156,9 @@ def bracket( "sl": sl, "side": side.value, "tif": tif.value, + "decision_name": decision_name, + "decision_summary": decision_summary, + "decision_reasoning": decision_reasoning, }, ) ) @@ -238,14 +295,23 @@ def _place( tif: TIF, dry_run: bool, idempotency_key: str | None, + decision_name: str, + decision_summary: str, + decision_reasoning: str, ) -> None: state = get_state(ctx) command = "order.place" + decision_name = _normalize_decision_name(decision_name) + decision_summary = _normalize_single_line(decision_summary, "decision summary") + decision_reasoning = _normalize_required_text(decision_reasoning, "decision reasoning") params: dict[str, object] = { "side": side, "symbol": symbol, "qty": qty, "tif": tif.value, + "decision_name": decision_name, + "decision_summary": decision_summary, + "decision_reasoning": decision_reasoning, } if limit is not None: params["limit"] = limit @@ -267,3 +333,27 @@ def _place( ) except BrokerError as exc: handle_error(exc, json_output=state.json_output, command=command, strict=state.strict) + + +def _normalize_required_text(value: str, label: str) -> str: + out = value.strip() + if not out: + raise typer.BadParameter(f"{label} is required") + return out + + +def _normalize_single_line(value: str, label: str) -> str: + out = _normalize_required_text(value, label) + if "\n" in out or "\r" in out: + raise typer.BadParameter(f"{label} must be single-line plain text") + return out + + +def _normalize_decision_name(value: str) -> str: + out = _normalize_single_line(value, "decision name") + words = [part for part in out.split(" ") if part] + if not words: + raise typer.BadParameter("decision name is required") + if not all(word[0].isupper() for word in words if word and word[0].isalpha()): + raise typer.BadParameter("decision name must be title case plain text") + return out diff --git a/cli/tests/test_cli/test_command_contracts.py b/cli/tests/test_cli/test_command_contracts.py index ba2cafd..a710a2a 100644 --- a/cli/tests/test_cli/test_command_contracts.py +++ b/cli/tests/test_cli/test_command_contracts.py @@ -159,9 +159,57 @@ def test_subcommand_surface_contract( (["capabilities"], "market.capabilities"), (["chain", "AAPL"], "market.chain"), (["history", "AAPL", "--period", "1d", "--bar", "1m"], "market.history"), - (["order", "buy", "AAPL", "1"], "order.place"), - (["order", "sell", "AAPL", "1"], "order.place"), - (["order", "bracket", "AAPL", "1", "--entry", "100", "--tp", "120", "--sl", "95"], "order.bracket"), + ( + [ + "order", + "buy", + "AAPL", + "1", + "--decision-name", + "Initiate AAPL Position", + "--decision-summary", + "Start core position", + "--decision-reasoning", + "## Thesis\nBuy quality growth.", + ], + "order.place", + ), + ( + [ + "order", + "sell", + "AAPL", + "1", + "--decision-name", + "Trim AAPL Position", + "--decision-summary", + "Reduce exposure", + "--decision-reasoning", + "## Thesis\nLock gains.", + ], + "order.place", + ), + ( + [ + "order", + "bracket", + "AAPL", + "1", + "--entry", + "100", + "--tp", + "120", + "--sl", + "95", + "--decision-name", + "Open AAPL Bracket", + "--decision-summary", + "Enter with defined risk", + "--decision-reasoning", + "## Plan\nUse bracket controls.", + ], + "order.bracket", + ), (["order", "status", "cid-1"], "order.status"), (["orders"], "orders.list"), (["cancel", "cid-1"], "order.cancel"), diff --git a/daemon/src/broker_daemon/config.py b/daemon/src/broker_daemon/config.py index 356ee4f..4003e63 100644 --- a/daemon/src/broker_daemon/config.py +++ b/daemon/src/broker_daemon/config.py @@ -118,6 +118,20 @@ def _normalize_probe_symbols(cls, value: Any) -> list[str]: return normalized or ["AAPL"] +class ObservabilityConfig(BaseModel): + fund_dir: Path | None = None + auto_sync: bool = True + auto_push: bool = True + etrade_fill_poll_seconds: int = 10 + + @field_validator("etrade_fill_poll_seconds") + @classmethod + def _validate_etrade_fill_poll_seconds(cls, value: int) -> int: + if value < 1: + raise ValueError("etrade_fill_poll_seconds must be >= 1") + return value + + class AppConfig(BaseModel): provider: str = "ib" gateway: GatewayConfig = Field(default_factory=GatewayConfig) @@ -128,6 +142,7 @@ class AppConfig(BaseModel): output: OutputConfig = Field(default_factory=OutputConfig) runtime: RuntimeConfig = Field(default_factory=RuntimeConfig) market_data: MarketDataConfig = Field(default_factory=MarketDataConfig) + observability: ObservabilityConfig = Field(default_factory=ObservabilityConfig) @field_validator("provider") @classmethod @@ -144,6 +159,8 @@ def expanded(self) -> "AppConfig": clone.logging.log_file = clone.logging.log_file.expanduser() clone.runtime.socket_path = clone.runtime.socket_path.expanduser() clone.runtime.pid_file = clone.runtime.pid_file.expanduser() + if clone.observability.fund_dir: + clone.observability.fund_dir = clone.observability.fund_dir.expanduser() return clone def ensure_dirs(self) -> None: @@ -153,6 +170,8 @@ def ensure_dirs(self) -> None: expanded.logging.audit_db.parent.mkdir(parents=True, exist_ok=True) expanded.logging.log_file.parent.mkdir(parents=True, exist_ok=True) expanded.etrade.token_path.parent.mkdir(parents=True, exist_ok=True) + if expanded.observability.fund_dir: + expanded.observability.fund_dir.mkdir(parents=True, exist_ok=True) def _coerce_env_value(value: str) -> Any: @@ -192,7 +211,7 @@ def _read_broker_json(path: Path) -> dict[str, Any]: def _extract_broker_config(data: dict[str, Any]) -> dict[str, Any]: out: dict[str, Any] = {} raw_broker = data.get("broker") - sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data"} + sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data", "observability"} if isinstance(raw_broker, dict): provider = raw_broker.get("provider") @@ -215,7 +234,7 @@ def _extract_broker_config(data: dict[str, Any]) -> dict[str, Any]: def _apply_env_overrides(data: dict[str, Any]) -> dict[str, Any]: result = dict(data) - sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data"} + sections = {"gateway", "etrade", "risk", "logging", "agent", "output", "runtime", "market_data", "observability"} for key, raw in os.environ.items(): if key == "BROKER_PROVIDER": result["provider"] = raw.strip() diff --git a/daemon/src/broker_daemon/daemon/order_manager.py b/daemon/src/broker_daemon/daemon/order_manager.py index 3becc31..94e95df 100644 --- a/daemon/src/broker_daemon/daemon/order_manager.py +++ b/daemon/src/broker_daemon/daemon/order_manager.py @@ -8,7 +8,8 @@ from broker_daemon.audit.logger import AuditLogger from broker_daemon.models.events import Event, EventTopic -from broker_daemon.models.orders import FillRecord, OrderRecord, OrderRequest, OrderStatus, OrderType +from broker_daemon.models.orders import FillRecord, OrderRecord, OrderRequest, OrderStatus, OrderType, Side +from broker_daemon.observability.fund_sync import FundSyncService from broker_daemon.providers import BrokerProvider from broker_daemon.risk.engine import RiskContext, RiskEngine @@ -28,13 +29,16 @@ def __init__( risk: RiskEngine, audit: AuditLogger, event_cb: Callable[[Event], Awaitable[None]] | None = None, + fund_sync: FundSyncService | None = None, ) -> None: self._provider = provider self._risk = risk self._audit = audit self._event_cb = event_cb + self._fund_sync = fund_sync self._orders: dict[str, OrderRecord] = {} self._fills: list[FillRecord] = [] + self._fill_ids_seen: set[str] = set() def _infer_order_type(self, req: OrderRequest) -> OrderType: if req.limit is not None and req.stop is not None: @@ -94,6 +98,7 @@ async def place_order(self, request: OrderRequest) -> OrderRecord: tif=request.tif, status=OrderStatus.PENDING_SUBMIT, risk_check_result=risk_result.model_dump(mode="json"), + tags=dict(request.tags), ) broker_res = await self._provider.place_order(request, client_order_id) @@ -115,6 +120,21 @@ async def place_order(self, request: OrderRequest) -> OrderRecord: }, ) ) + + if self._fund_sync: + decision_id = _as_non_empty_string(request.tags.get("decision_id")) + decision_name = _as_non_empty_string(request.tags.get("decision_name")) + decision_summary = _as_non_empty_string(request.tags.get("decision_summary")) + decision_reasoning = _as_non_empty_string(request.tags.get("decision_reasoning")) + if decision_id and decision_name and decision_summary and decision_reasoning: + await self._fund_sync.sync_decision( + decision_id=decision_id, + symbol=request.symbol, + side=request.side, + title=decision_name, + summary=decision_summary, + reasoning_markdown=decision_reasoning, + ) return record async def place_bracket( @@ -127,6 +147,7 @@ async def place_bracket( tp: float, sl: float, tif: str, + decision: dict[str, str] | None = None, ) -> dict[str, Any]: request = OrderRequest(side=side, symbol=symbol, qty=qty, limit=entry, tif=tif) context = await self._risk_context() @@ -154,7 +175,24 @@ async def place_bracket( }, ) ) - return {"client_order_id": client_order_id, **result} + if self._fund_sync and isinstance(decision, dict): + decision_id = _as_non_empty_string(decision.get("decision_id")) + decision_name = _as_non_empty_string(decision.get("decision_name")) + decision_summary = _as_non_empty_string(decision.get("decision_summary")) + decision_reasoning = _as_non_empty_string(decision.get("decision_reasoning")) + if decision_id and decision_name and decision_summary and decision_reasoning: + await self._fund_sync.sync_decision( + decision_id=decision_id, + symbol=symbol, + side=Side.BUY if str(side).lower() != "sell" else Side.SELL, + title=decision_name, + summary=decision_summary, + reasoning_markdown=decision_reasoning, + ) + out = {"client_order_id": client_order_id, **result} + if decision: + out["decision"] = dict(decision) + return out async def update_order_status( self, @@ -175,8 +213,23 @@ async def update_order_status( await self._audit.upsert_order(record) async def add_fill(self, fill: FillRecord) -> None: + if fill.fill_id: + if fill.fill_id in self._fill_ids_seen: + return + self._fill_ids_seen.add(fill.fill_id) + + if fill.side is None: + order = self._orders.get(fill.client_order_id) + if order: + fill.side = order.side + decision_id = _as_non_empty_string(order.tags.get("decision_id")) + if decision_id: + fill.decision_id = decision_id + self._fills.append(fill) await self._audit.log_fill(fill) + if self._fund_sync: + await self._fund_sync.sync_fill(fill) await self._emit(Event(topic=EventTopic.FILLS, payload=fill.model_dump(mode="json"))) async def cancel_order(self, order_id: str) -> dict[str, Any]: @@ -220,7 +273,14 @@ async def list_fills(self, symbol: str | None = None) -> list[dict[str, Any]]: broker_fills = await self._provider.fills() for fill in broker_fills: await self._audit.log_fill(fill) - combined = [*self._fills, *broker_fills] + combined: list[FillRecord] = [] + seen_ids: set[str] = set() + for fill in [*self._fills, *broker_fills]: + if fill.fill_id: + if fill.fill_id in seen_ids: + continue + seen_ids.add(fill.fill_id) + combined.append(fill) if symbol: combined = [f for f in combined if f.symbol.upper() == symbol.upper()] return [f.model_dump(mode="json") for f in combined] @@ -244,3 +304,9 @@ def _status_from_ib(raw: str) -> OrderStatus: "rejected": OrderStatus.REJECTED, } return mapping.get(normalized, OrderStatus.SUBMITTED) + + +def _as_non_empty_string(value: Any) -> str: + if isinstance(value, str): + return value.strip() + return "" diff --git a/daemon/src/broker_daemon/daemon/server.py b/daemon/src/broker_daemon/daemon/server.py index ae9eaf0..85ac80c 100644 --- a/daemon/src/broker_daemon/daemon/server.py +++ b/daemon/src/broker_daemon/daemon/server.py @@ -25,6 +25,7 @@ from broker_daemon.models.events import Event, EventTopic from broker_daemon.models.market import QUOTE_INTENTS, OptionChainEntry from broker_daemon.models.orders import FillRecord, OrderRequest +from broker_daemon.observability import FundSyncService from broker_daemon.protocol import ErrorResponse, EventEnvelope, Request, Response, decode_request, encode_model, frame_payload, read_framed from broker_daemon.providers import IBProvider from broker_daemon.risk.engine import RiskEngine @@ -94,17 +95,20 @@ def __init__(self, cfg: AppConfig) -> None: else: self._provider = IBProvider(cfg.gateway, audit=self._audit, event_cb=self._on_broker_event) + self._fund_sync = FundSyncService(cfg.observability, provider=self._provider) self._market_data = MarketDataService(self._provider, settings=cfg.market_data) self._orders = OrderManager( provider=self._provider, risk=self._risk, audit=self._audit, event_cb=self._broadcast_event, + fund_sync=self._fund_sync, ) self._server: asyncio.AbstractServer | None = None self._subscribers: list[Subscriber] = [] self._monitor_task: asyncio.Task[None] | None = None + self._fills_reconcile_task: asyncio.Task[None] | None = None @property def socket_path(self) -> Path: @@ -125,6 +129,8 @@ async def start(self) -> None: self._cfg.runtime.pid_file.write_text(str(os.getpid()), encoding="utf-8") self._monitor_task = asyncio.create_task(self._monitor_loop()) + if self._cfg.provider == "etrade" and self._fund_sync.enabled: + self._fills_reconcile_task = asyncio.create_task(self._fills_reconcile_loop()) await self._audit.log_connection_event("daemon_started", {"socket": str(self.socket_path)}) async def serve(self) -> None: @@ -143,6 +149,9 @@ async def stop(self) -> None: if self._monitor_task: self._monitor_task.cancel() self._monitor_task = None + if self._fills_reconcile_task: + self._fills_reconcile_task.cancel() + self._fills_reconcile_task = None for sub in list(self._subscribers): sub.writer.close() @@ -477,6 +486,10 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: idempotency_key = raw.pop("idempotency_key", None) if idempotency_key and not raw.get("client_order_id"): raw["client_order_id"] = str(idempotency_key) + if not dry_run: + decision_tags = _extract_decision_tags(raw, required=True) + existing_tags = raw.get("tags") if isinstance(raw.get("tags"), dict) else {} + raw["tags"] = {**existing_tags, **decision_tags} req = OrderRequest.model_validate(raw) @@ -515,6 +528,7 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: if cmd == "order.bracket": self._require_capability("bracket_orders", "bracket orders") + decision_tags = _extract_decision_tags(p, required=True) res = await self._orders.place_bracket( side=str(p.get("side", "buy")), symbol=str(p["symbol"]), @@ -523,6 +537,7 @@ async def _dispatch(self, request: Request) -> dict[str, Any]: tp=float(p["tp"]), sl=float(p["sl"]), tif=str(p.get("tif", "DAY")), + decision=decision_tags, ) return res @@ -714,8 +729,12 @@ async def _on_broker_event(self, event: Event) -> None: client_order_id=str(event.payload.get("client_order_id") or ""), ib_order_id=_maybe_int(event.payload.get("ib_order_id")), symbol=str(symbol), + side=_maybe_side(event.payload.get("side")), qty=float(event.payload.get("qty") or 0.0), price=float(event.payload.get("price") or 0.0), + commission=_maybe_float(event.payload.get("commission")), + timestamp=_maybe_datetime(event.payload.get("timestamp")) or datetime.now(UTC), + decision_id=_maybe_string(event.payload.get("decision_id")), ) await self._orders.add_fill(fill) @@ -781,6 +800,17 @@ async def _monitor_loop(self) -> None: except Exception: logger.debug("drawdown monitor skipped due to transient error", exc_info=True) + async def _fills_reconcile_loop(self) -> None: + interval = self._cfg.observability.etrade_fill_poll_seconds + while not self._shutdown.is_set(): + await asyncio.sleep(interval) + try: + fills = await self._provider.fills() + for fill in fills: + await self._orders.add_fill(fill) + except Exception: + logger.exception("fills reconcile loop failed") + def _parse_strike_range(raw: Any) -> tuple[float, float] | None: if raw is None: @@ -1109,6 +1139,9 @@ def _command_schema_registry() -> dict[str, dict[str, Any]]: "client_order_id": {"type": "string"}, "idempotency_key": {"type": "string"}, "dry_run": {"type": "boolean"}, + "decision_name": {"type": "string"}, + "decision_summary": {"type": "string"}, + "decision_reasoning": {"type": "string"}, }, "required": ["side", "symbol", "qty"], }, @@ -1125,6 +1158,27 @@ def _command_schema_registry() -> dict[str, dict[str, Any]]: }, } + base["order.bracket"] = { + "params": { + "type": "object", + "additionalProperties": False, + "properties": { + "side": {"enum": ["buy", "sell"]}, + "symbol": {"type": "string"}, + "qty": {"type": "number", "exclusiveMinimum": 0}, + "entry": {"type": "number"}, + "tp": {"type": "number"}, + "sl": {"type": "number"}, + "tif": {"enum": ["DAY", "GTC", "IOC"]}, + "decision_name": {"type": "string"}, + "decision_summary": {"type": "string"}, + "decision_reasoning": {"type": "string"}, + }, + "required": ["symbol", "qty", "entry", "tp", "sl", "decision_name", "decision_summary", "decision_reasoning"], + }, + "result": {"type": "object", "additionalProperties": True}, + } + base["risk.check"] = { "params": { "type": "object", @@ -1217,6 +1271,102 @@ def _maybe_int(value: Any) -> int | None: return None +def _maybe_datetime(value: Any) -> datetime | None: + if isinstance(value, datetime): + return value + if not isinstance(value, str): + return None + raw = value.strip() + if not raw: + return None + try: + return datetime.fromisoformat(raw.replace("Z", "+00:00")) + except Exception: + return None + + +def _maybe_side(value: Any) -> str | None: + if value is None: + return None + normalized = str(value).strip().lower() + if normalized in {"buy", "bot", "b"}: + return "buy" + if normalized in {"sell", "sld", "s"}: + return "sell" + return None + + +def _maybe_string(value: Any) -> str | None: + if not isinstance(value, str): + return None + out = value.strip() + return out or None + + +def _extract_decision_tags(params: dict[str, Any], *, required: bool) -> dict[str, str]: + name = _maybe_string(params.get("decision_name")) + summary = _maybe_string(params.get("decision_summary")) + reasoning = _maybe_string(params.get("decision_reasoning")) + + if required: + if not name: + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_name is required when placing an order", + suggestion="Provide --decision-name with title-case plain text.", + ) + if not summary: + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_summary is required when placing an order", + suggestion="Provide --decision-summary with a one-line plain-text summary.", + ) + if not reasoning: + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_reasoning is required when placing an order", + suggestion="Provide --decision-reasoning with markdown rationale.", + ) + if not name or not summary or not reasoning: + return {} + + if "\n" in name or "\r" in name: + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_name must be plain text on a single line", + suggestion="Use title-case text, e.g. 'Initiate NVDA Position'.", + ) + if "\n" in summary or "\r" in summary: + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_summary must be plain text on a single line", + ) + if not _is_title_case(name): + raise BrokerError( + ErrorCode.INVALID_ARGS, + "decision_name must be title case plain text", + suggestion="Example: 'Add To Core Position'.", + ) + + return { + "decision_id": _decision_timestamp_id(), + "decision_name": name, + "decision_summary": summary, + "decision_reasoning": reasoning, + } + + +def _is_title_case(value: str) -> bool: + words = [part for part in value.split(" ") if part] + if not words: + return False + return all(word[0].isupper() for word in words if word and word[0].isalpha()) + + +def _decision_timestamp_id() -> str: + return datetime.now(UTC).strftime("%Y%m%dT%H%M%S%fZ") + + async def _safe_wait_closed(writer: asyncio.StreamWriter) -> None: try: await writer.wait_closed() diff --git a/daemon/src/broker_daemon/models/orders.py b/daemon/src/broker_daemon/models/orders.py index f09e727..774c046 100644 --- a/daemon/src/broker_daemon/models/orders.py +++ b/daemon/src/broker_daemon/models/orders.py @@ -72,6 +72,7 @@ class OrderRecord(BaseModel): fill_qty: float = 0 commission: float | None = None risk_check_result: dict[str, Any] = Field(default_factory=dict) + tags: dict[str, Any] = Field(default_factory=dict) class FillRecord(BaseModel): @@ -79,7 +80,9 @@ class FillRecord(BaseModel): client_order_id: str ib_order_id: int | None symbol: str + side: Side | None = None qty: float price: float commission: float | None = None timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + decision_id: str | None = None diff --git a/daemon/src/broker_daemon/observability/__init__.py b/daemon/src/broker_daemon/observability/__init__.py new file mode 100644 index 0000000..c936f55 --- /dev/null +++ b/daemon/src/broker_daemon/observability/__init__.py @@ -0,0 +1,5 @@ +"""Fund repository sync helpers for external observability dashboards.""" + +from broker_daemon.observability.fund_sync import FundSyncService + +__all__ = ["FundSyncService"] diff --git a/daemon/src/broker_daemon/observability/fund_sync.py b/daemon/src/broker_daemon/observability/fund_sync.py new file mode 100644 index 0000000..b44aff3 --- /dev/null +++ b/daemon/src/broker_daemon/observability/fund_sync.py @@ -0,0 +1,324 @@ +"""Sync decisions and fills into a git-backed fund observability repository.""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime +import json +import logging +from pathlib import Path +import subprocess +from typing import Any + +from broker_daemon.config import ObservabilityConfig +from broker_daemon.models.orders import FillRecord, Side +from broker_daemon.providers.base import BrokerProvider + +logger = logging.getLogger(__name__) + +FUND_CONFIG = "config.json" +FUND_FILLS = "fills.json" +FUND_CASH_EVENTS = "cash_events.json" +FUND_DECISIONS_DIR = "decisions" + + +def _utc_now() -> datetime: + return datetime.now(UTC) + + +def _iso_utc(ts: datetime | None = None) -> str: + return (ts or _utc_now()).isoformat() + + +def _decision_timestamp_id(ts: datetime | None = None) -> str: + return (ts or _utc_now()).strftime("%Y%m%dT%H%M%S%fZ") + + +def _safe_float(value: Any, *, default: float = 0.0) -> float: + try: + return float(value) + except Exception: + return default + + +def _yaml_quoted(value: str) -> str: + # JSON quoted strings are valid YAML scalar strings. + return json.dumps(value) + + +class FundSyncService: + """Handles append-only observability file writes and git push behavior.""" + + def __init__(self, cfg: ObservabilityConfig, *, provider: BrokerProvider | None = None) -> None: + self._cfg = cfg + self._provider = provider + self._fund_dir = cfg.fund_dir + self._lock = asyncio.Lock() + + @property + def enabled(self) -> bool: + return bool(self._cfg.auto_sync and self._fund_dir) + + async def sync_decision( + self, + *, + decision_id: str, + symbol: str, + side: Side, + title: str, + summary: str, + reasoning_markdown: str, + created_at: datetime | None = None, + ) -> None: + if not self.enabled: + return + assert self._fund_dir is not None + + try: + async with self._lock: + self._ensure_repo_layout() + decision_file = self._fund_dir / FUND_DECISIONS_DIR / f"{decision_id}.md" + body = self._decision_markdown( + symbol=symbol, + side=side, + title=title, + summary=summary, + reasoning_markdown=reasoning_markdown, + created_at=created_at, + ) + changed = self._write_if_changed(decision_file, body) + if changed: + await self._commit_and_push( + message=f"decision: {decision_id}", + changed_paths=[decision_file], + ) + except Exception: + logger.exception("fund sync: failed to sync decision %s", decision_id) + + async def sync_fill(self, fill: FillRecord) -> None: + if not self.enabled: + return + assert self._fund_dir is not None + + try: + async with self._lock: + self._ensure_repo_layout() + fills_path = self._fund_dir / FUND_FILLS + rows = self._read_json_array(fills_path) + if any(str(row.get("id", "")).strip() == fill.fill_id for row in rows): + return + + side = self._normalize_side(fill.side) + rows.append( + { + "id": fill.fill_id, + "symbol": fill.symbol, + "side": side, + "qty": float(fill.qty), + "price": float(fill.price), + "commission": float(fill.commission) if fill.commission is not None else 0.0, + "timestamp": _iso_utc(fill.timestamp), + "decisionId": fill.decision_id, + } + ) + self._write_json_atomic(fills_path, rows) + + changed_paths = [fills_path] + if await self._sync_interest_from_balance_locked(): + changed_paths.append(self._fund_dir / FUND_CASH_EVENTS) + + await self._commit_and_push( + message=f"fill: {fill.fill_id}", + changed_paths=changed_paths, + ) + except Exception: + logger.exception("fund sync: failed to sync fill %s", fill.fill_id) + + def _ensure_repo_layout(self) -> None: + assert self._fund_dir is not None + self._fund_dir.mkdir(parents=True, exist_ok=True) + (self._fund_dir / FUND_DECISIONS_DIR).mkdir(parents=True, exist_ok=True) + self._ensure_json_array_file(self._fund_dir / FUND_FILLS) + self._ensure_json_array_file(self._fund_dir / FUND_CASH_EVENTS) + + def _ensure_json_array_file(self, path: Path) -> None: + if path.exists(): + return + self._write_json_atomic(path, []) + + def _read_json_array(self, path: Path) -> list[dict[str, Any]]: + if not path.exists(): + return [] + try: + loaded = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return [] + if not isinstance(loaded, list): + return [] + return [row for row in loaded if isinstance(row, dict)] + + def _read_json_object(self, path: Path) -> dict[str, Any]: + if not path.exists(): + return {} + try: + loaded = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return {} + if not isinstance(loaded, dict): + return {} + return loaded + + def _write_if_changed(self, path: Path, content: str) -> bool: + existing = "" + if path.exists(): + existing = path.read_text(encoding="utf-8") + if existing == content: + return False + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(content, encoding="utf-8") + return True + + def _write_json_atomic(self, path: Path, payload: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp_path = path.with_name(f".{path.name}.tmp") + tmp_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + tmp_path.replace(path) + + def _decision_markdown( + self, + *, + symbol: str, + side: Side, + title: str, + summary: str, + reasoning_markdown: str, + created_at: datetime | None = None, + ) -> str: + ts = created_at or _utc_now() + date_text = ts.date().isoformat() + reasoning = reasoning_markdown.strip() + if not reasoning: + reasoning = "_No decision reasoning provided._" + return ( + "---\n" + f"date: {date_text}\n" + f"type: {side.value}\n" + f"tickers: [{symbol.upper()}]\n" + f"title: {_yaml_quoted(title.strip())}\n" + f"summary: {_yaml_quoted(summary.strip())}\n" + "---\n\n" + f"{reasoning}\n" + ) + + def _normalize_side(self, side: Side | None) -> str: + if side == Side.SELL: + return "sell" + return "buy" + + async def _sync_interest_from_balance_locked(self) -> bool: + if self._provider is None: + return False + assert self._fund_dir is not None + + try: + balance = await self._provider.balance() + except Exception: + return False + + if balance.cash is None: + return False + + config_path = self._fund_dir / FUND_CONFIG + config = self._read_json_object(config_path) + initial_capital = _safe_float(config.get("initialCapital"), default=0.0) + + fills = self._read_json_array(self._fund_dir / FUND_FILLS) + trade_cash_delta = 0.0 + for row in fills: + qty = _safe_float(row.get("qty")) + price = _safe_float(row.get("price")) + commission = _safe_float(row.get("commission")) + side = str(row.get("side", "")).strip().lower() + if side == "sell": + trade_cash_delta += qty * price + else: + trade_cash_delta -= qty * price + trade_cash_delta -= commission + + expected_cash_without_interest = initial_capital + trade_cash_delta + inferred_total_interest = float(balance.cash) - expected_cash_without_interest + + cash_events_path = self._fund_dir / FUND_CASH_EVENTS + cash_events = self._read_json_array(cash_events_path) + known_interest = sum( + _safe_float(event.get("amount")) + for event in cash_events + if str(event.get("type", "")).strip().lower() == "interest" + ) + delta_interest = round(inferred_total_interest - known_interest, 2) + if abs(delta_interest) < 0.01: + return False + + ts = _utc_now() + cash_events.append( + { + "id": f"interest-{_decision_timestamp_id(ts)}", + "type": "interest", + "amount": delta_interest, + "timestamp": _iso_utc(ts), + "source": "inferred_from_broker_cash_balance", + } + ) + self._write_json_atomic(cash_events_path, cash_events) + return True + + async def _commit_and_push(self, *, message: str, changed_paths: list[Path]) -> None: + if not changed_paths: + return + if not self._is_git_repo(): + logger.warning("fund sync: %s is not a git repository; skipping commit/push", self._fund_dir) + return + + rel_paths = [str(path.relative_to(self._fund_dir)) for path in changed_paths if path.exists()] + if not rel_paths: + return + + await asyncio.to_thread(self._run_git_checked, "add", "--", *rel_paths) + staged = await asyncio.to_thread(self._run_git, "diff", "--cached", "--quiet", "--") + if staged.returncode == 0: + return + if staged.returncode not in {0, 1}: + raise RuntimeError(f"git diff --cached failed: {staged.stderr.strip()}") + + await asyncio.to_thread(self._run_git_checked, "commit", "-m", message) + + if not self._cfg.auto_push: + return + + has_origin = await asyncio.to_thread(self._run_git, "remote", "get-url", "origin") + if has_origin.returncode != 0: + logger.warning("fund sync: remote 'origin' is not configured; skipping push") + return + await asyncio.to_thread(self._run_git_checked, "push", "origin", "HEAD") + + def _is_git_repo(self) -> bool: + result = self._run_git("rev-parse", "--is-inside-work-tree") + return result.returncode == 0 and result.stdout.strip() == "true" + + def _run_git_checked(self, *args: str) -> subprocess.CompletedProcess[str]: + result = self._run_git(*args) + if result.returncode != 0: + stderr = result.stderr.strip() + stdout = result.stdout.strip() + details = stderr or stdout or "unknown git error" + raise RuntimeError(f"git {' '.join(args)} failed: {details}") + return result + + def _run_git(self, *args: str) -> subprocess.CompletedProcess[str]: + assert self._fund_dir is not None + return subprocess.run( + ["git", "-C", str(self._fund_dir), *args], + check=False, + capture_output=True, + text=True, + ) diff --git a/daemon/src/broker_daemon/providers/etrade.py b/daemon/src/broker_daemon/providers/etrade.py index dba1807..30ef25c 100644 --- a/daemon/src/broker_daemon/providers/etrade.py +++ b/daemon/src/broker_daemon/providers/etrade.py @@ -694,6 +694,7 @@ async def fills(self) -> list[FillRecord]: client_order_id=str(parsed["client_order_id"] or ""), ib_order_id=_as_int(parsed["order_id"]), symbol=str(parsed["symbol"] or ""), + side=_normalize_side(parsed["action"]), qty=qty, price=float(parsed["avg_fill_price"] or 0.0), timestamp=datetime.now(UTC), @@ -1439,6 +1440,17 @@ def _order_action(side: str) -> str: return mapping.get(side.lower(), side.upper()) +def _normalize_side(action: Any) -> str | None: + if action is None: + return None + normalized = str(action).strip().lower() + if normalized in {"buy", "buy_open", "buy_to_open", "buy_to_cover"}: + return "buy" + if normalized in {"sell", "sell_short", "sell_to_open", "sell_to_close"}: + return "sell" + return None + + def _order_term(tif: str) -> str: mapping = { "DAY": "GOOD_FOR_DAY", diff --git a/daemon/src/broker_daemon/providers/ib.py b/daemon/src/broker_daemon/providers/ib.py index 2321daf..7bb9862 100644 --- a/daemon/src/broker_daemon/providers/ib.py +++ b/daemon/src/broker_daemon/providers/ib.py @@ -239,13 +239,20 @@ def _on_exec_details(self, *args: Any) -> None: if len(args) < 2: return trade, fill = args[0], args[1] + execution = getattr(fill, "execution", None) + commission_report = getattr(fill, "commissionReport", None) + execution_side = getattr(execution, "side", None) + order_action = getattr(getattr(trade, "order", None), "action", None) payload = { "ib_order_id": getattr(getattr(trade, "order", None), "orderId", None), "client_order_id": getattr(getattr(trade, "order", None), "orderRef", None), "symbol": getattr(getattr(fill, "contract", None), "symbol", None), - "qty": getattr(getattr(fill, "execution", None), "shares", None), - "price": getattr(getattr(fill, "execution", None), "price", None), - "fill_id": getattr(getattr(fill, "execution", None), "execId", None), + "side": _normalize_side(execution_side or order_action), + "qty": getattr(execution, "shares", None), + "price": getattr(execution, "price", None), + "fill_id": getattr(execution, "execId", None), + "timestamp": getattr(fill, "time", datetime.now(UTC)), + "commission": _to_float_or_none(getattr(commission_report, "commission", None)), } asyncio.create_task(self._event_cb(Event(topic=EventTopic.FILLS, payload=payload))) @@ -872,6 +879,7 @@ async def fills(self) -> list[FillRecord]: client_order_id=getattr(execution, "orderRef", ""), ib_order_id=getattr(execution, "orderId", None), symbol=getattr(contract, "symbol", ""), + side=_normalize_side(getattr(execution, "side", None)), qty=float(getattr(execution, "shares", 0.0)), price=float(getattr(execution, "price", 0.0)), commission=_to_float_or_none(getattr(commission_report, "commission", None)), @@ -898,6 +906,17 @@ def _to_float_or_none(value: Any) -> float | None: return out +def _normalize_side(value: Any) -> str | None: + if value is None: + return None + normalized = str(value).strip().lower() + if normalized in {"buy", "bot", "b"}: + return "buy" + if normalized in {"sell", "sld", "s"}: + return "sell" + return None + + def _read_account_value(by_tag: dict[str, str], tag: str) -> float | None: keys = [f"{tag}:USD", f"{tag}:BASE", tag] for key in keys: diff --git a/daemon/tests/test_daemon/test_config_loader.py b/daemon/tests/test_daemon/test_config_loader.py index 587b99b..b40de55 100644 --- a/daemon/tests/test_daemon/test_config_loader.py +++ b/daemon/tests/test_daemon/test_config_loader.py @@ -103,3 +103,22 @@ def test_load_config_supports_market_data_overrides(tmp_path: Path, monkeypatch) assert cfg.market_data.quote_intent_default == "last_only" assert cfg.market_data.probe_symbols == ["AAPL", "MSFT"] assert cfg.market_data.capability_ttl_seconds == 42 + + +def test_load_config_supports_observability_overrides(tmp_path: Path, monkeypatch) -> None: + _set_runtime_env(monkeypatch, tmp_path) + monkeypatch.setenv("BROKER_OBSERVABILITY_FUND_DIR", str(tmp_path / "fund-atlas")) + monkeypatch.setenv("BROKER_OBSERVABILITY_AUTO_SYNC", "true") + monkeypatch.setenv("BROKER_OBSERVABILITY_AUTO_PUSH", "true") + monkeypatch.setenv("BROKER_OBSERVABILITY_ETRADE_FILL_POLL_SECONDS", "17") + + broker_json = tmp_path / "config.json" + broker_json.write_text("{}", encoding="utf-8") + monkeypatch.setattr(broker_config, "DEFAULT_BROKER_CONFIG_JSON", broker_json) + + cfg = broker_config.load_config() + + assert cfg.observability.fund_dir == (tmp_path / "fund-atlas") + assert cfg.observability.auto_sync is True + assert cfg.observability.auto_push is True + assert cfg.observability.etrade_fill_poll_seconds == 17 diff --git a/daemon/tests/test_daemon/test_fund_sync.py b/daemon/tests/test_daemon/test_fund_sync.py new file mode 100644 index 0000000..8b06916 --- /dev/null +++ b/daemon/tests/test_daemon/test_fund_sync.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from broker_daemon.config import ObservabilityConfig +from broker_daemon.models.orders import FillRecord, Side +from broker_daemon.models.portfolio import Balance +from broker_daemon.observability.fund_sync import FundSyncService + + +class _FakeProvider: + async def balance(self) -> Balance: + return Balance(cash=1_000.0, net_liquidation=1_000.0) + + +@pytest.mark.asyncio +async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> None: + fund_dir = tmp_path / "fund-atlas" + fund_dir.mkdir(parents=True) + (fund_dir / "config.json").write_text( + json.dumps( + { + "name": "Atlas Fund", + "slug": "atlas", + "inception": "2026-02-20T00:00:00Z", + "currency": "USD", + "initialCapital": 1000.0, + "benchmarks": [], + } + ) + + "\n", + encoding="utf-8", + ) + + sync = FundSyncService( + ObservabilityConfig( + fund_dir=fund_dir, + auto_sync=True, + auto_push=False, + ), + provider=_FakeProvider(), + ) + + await sync.sync_decision( + decision_id="20260220T120000000000Z", + symbol="AAPL", + side=Side.BUY, + title="Initiate AAPL Position", + summary="Start a core position", + reasoning_markdown="## Thesis\nBuy quality compounder.", + ) + + await sync.sync_fill( + FillRecord( + fill_id="fill-1", + client_order_id="cid-1", + ib_order_id=101, + symbol="AAPL", + side=Side.BUY, + qty=2, + price=100.0, + commission=1.0, + decision_id="20260220T120000000000Z", + ) + ) + + decision_file = fund_dir / "decisions" / "20260220T120000000000Z.md" + assert decision_file.exists() + assert "Initiate AAPL Position" in decision_file.read_text(encoding="utf-8") + + fills = json.loads((fund_dir / "fills.json").read_text(encoding="utf-8")) + assert len(fills) == 1 + assert fills[0]["id"] == "fill-1" + assert fills[0]["side"] == "buy" + assert fills[0]["decisionId"] == "20260220T120000000000Z" + + cash_events = json.loads((fund_dir / "cash_events.json").read_text(encoding="utf-8")) + assert cash_events + assert cash_events[0]["type"] == "interest" + + +@pytest.mark.asyncio +async def test_sync_fill_deduplicates_by_fill_id(tmp_path: Path) -> None: + fund_dir = tmp_path / "fund-atlas" + fund_dir.mkdir(parents=True) + (fund_dir / "config.json").write_text( + json.dumps({"initialCapital": 1000.0}) + "\n", + encoding="utf-8", + ) + + sync = FundSyncService( + ObservabilityConfig( + fund_dir=fund_dir, + auto_sync=True, + auto_push=False, + ), + provider=_FakeProvider(), + ) + + fill = FillRecord( + fill_id="fill-dup", + client_order_id="cid-dup", + ib_order_id=11, + symbol="MSFT", + side=Side.SELL, + qty=1.0, + price=200.0, + ) + await sync.sync_fill(fill) + await sync.sync_fill(fill) + + rows = json.loads((fund_dir / "fills.json").read_text(encoding="utf-8")) + assert len(rows) == 1 + assert rows[0]["id"] == "fill-dup" diff --git a/install/steps/onboarding.sh b/install/steps/onboarding.sh index bdf00de..b644605 100644 --- a/install/steps/onboarding.sh +++ b/install/steps/onboarding.sh @@ -285,6 +285,323 @@ elif isinstance(value, (str, int, float)): PY } +read_observability_config_value() { + local key="$1" + if [[ ! -f "${BROKER_CONFIG_JSON}" ]]; then + return 0 + fi + if ! command -v python3 >/dev/null 2>&1; then + return 0 + fi + + python3 - "${BROKER_CONFIG_JSON}" "${key}" <<'PY' +import json +import sys +from pathlib import Path + +path = Path(sys.argv[1]) +key = sys.argv[2] +try: + data = json.loads(path.read_text(encoding="utf-8")) +except Exception: + sys.exit(0) + +if not isinstance(data, dict): + sys.exit(0) + +broker_cfg = data.get("broker") +if not isinstance(broker_cfg, dict): + sys.exit(0) + +observability = broker_cfg.get("observability") +if not isinstance(observability, dict): + sys.exit(0) + +value = observability.get(key) +if value is None: + sys.exit(0) + +if isinstance(value, bool): + print("true" if value else "false") +elif isinstance(value, (str, int, float)): + print(value) +PY +} + +save_observability_fund_dir() { + local fund_dir="$1" + if ! command -v python3 >/dev/null 2>&1; then + fail "python3 is required to update ${BROKER_CONFIG_JSON}." + fi + + BROKER_OBSERVABILITY_FUND_DIR="${fund_dir}" python3 - "${BROKER_CONFIG_JSON}" <<'PY' +import json +import os +import sys +from pathlib import Path + +config_path = Path(sys.argv[1]).expanduser() +config_path.parent.mkdir(parents=True, exist_ok=True) +fund_dir = os.environ.get("BROKER_OBSERVABILITY_FUND_DIR", "").strip() + +loaded: dict[str, object] = {} +if config_path.exists(): + try: + existing = json.loads(config_path.read_text(encoding="utf-8")) + if isinstance(existing, dict): + loaded = existing + except Exception: + loaded = {} + +broker_cfg = loaded.get("broker") +if not isinstance(broker_cfg, dict): + broker_cfg = {} + +observability_cfg = broker_cfg.get("observability") +if not isinstance(observability_cfg, dict): + observability_cfg = {} + +observability_cfg["fund_dir"] = fund_dir +observability_cfg["auto_sync"] = True +observability_cfg["auto_push"] = True +broker_cfg["observability"] = observability_cfg +loaded["broker"] = broker_cfg + +config_path.write_text(json.dumps(loaded, indent=2) + "\n", encoding="utf-8") +os.chmod(config_path, 0o600) +PY +} + +expand_path_value() { + local raw_path="$1" + if ! command -v python3 >/dev/null 2>&1; then + printf '%s\n' "${raw_path}" + return 0 + fi + BROKER_EXPAND_PATH="${raw_path}" python3 - <<'PY' +import os +from pathlib import Path + +raw = os.environ.get("BROKER_EXPAND_PATH", "").strip() +print(str(Path(raw).expanduser())) +PY +} + +fetch_initial_capital_from_provider() { + local setup_python="" + if [[ -x "${ROOT_DIR}/.venv/bin/python" ]]; then + setup_python="${ROOT_DIR}/.venv/bin/python" + elif command -v python3 >/dev/null 2>&1; then + setup_python="$(command -v python3)" + elif command -v python >/dev/null 2>&1; then + setup_python="$(command -v python)" + fi + + if [[ -z "${setup_python}" ]]; then + return 1 + fi + + local setup_pythonpath="${ROOT_DIR}/daemon/src:${ROOT_DIR}/sdk/python/src:${ROOT_DIR}/cli/src" + if [[ -n "${PYTHONPATH:-}" ]]; then + setup_pythonpath="${setup_pythonpath}:${PYTHONPATH}" + fi + + PYTHONPATH="${setup_pythonpath}" "${setup_python}" - <<'PY' +import asyncio +import sys + +from broker_daemon.config import load_config +from broker_daemon.providers.ib import IBProvider + + +async def main() -> None: + cfg = load_config() + if cfg.provider == "etrade": + from broker_daemon.providers.etrade import ETradeProvider + + provider = ETradeProvider(cfg.etrade) + else: + provider = IBProvider(cfg.gateway) + + try: + await provider.start() + balance = await provider.balance() + initial_capital = balance.net_liquidation if balance.net_liquidation is not None else balance.cash + if initial_capital is None: + raise RuntimeError("provider returned no net_liquidation or cash value") + print(float(initial_capital)) + finally: + try: + await provider.stop() + except Exception: + pass + + +try: + asyncio.run(main()) +except Exception as exc: + print(str(exc), file=sys.stderr) + sys.exit(1) +PY +} + +initialize_fund_repo_files() { + local fund_dir="$1" + local fund_name="$2" + local fund_slug="$3" + local inception_timestamp="$4" + local initial_capital="$5" + + BROKER_FUND_DIR="${fund_dir}" \ + BROKER_FUND_NAME="${fund_name}" \ + BROKER_FUND_SLUG="${fund_slug}" \ + BROKER_FUND_INCEPTION="${inception_timestamp}" \ + BROKER_FUND_INITIAL_CAPITAL="${initial_capital}" \ + python3 - <<'PY' +import json +import os +from pathlib import Path + +fund_dir = Path(os.environ["BROKER_FUND_DIR"]).expanduser() +fund_name = os.environ["BROKER_FUND_NAME"] +fund_slug = os.environ["BROKER_FUND_SLUG"] +inception = os.environ["BROKER_FUND_INCEPTION"] +initial_capital = float(os.environ["BROKER_FUND_INITIAL_CAPITAL"]) + +fund_dir.mkdir(parents=True, exist_ok=True) +(fund_dir / "decisions").mkdir(parents=True, exist_ok=True) + +config = { + "name": fund_name, + "slug": fund_slug, + "inception": inception, + "currency": "USD", + "initialCapital": initial_capital, + "benchmarks": [], + "cashInterestPolicy": { + "enabled": True, + "source": "inferred_from_broker_cash_balance" + }, +} + +(fund_dir / "config.json").write_text(json.dumps(config, indent=2) + "\n", encoding="utf-8") +(fund_dir / "fills.json").write_text("[]\n", encoding="utf-8") +(fund_dir / "cash_events.json").write_text("[]\n", encoding="utf-8") +PY +} + +initialize_fund_repo_git() { + local fund_dir="$1" + local origin_url="$2" + + if ! command -v git >/dev/null 2>&1; then + warn "Git not found. Skipping git repository initialization for fund directory." + return 0 + fi + + if ! git -C "${fund_dir}" rev-parse --is-inside-work-tree >/dev/null 2>&1; then + if ! git -C "${fund_dir}" init -b main >/dev/null 2>&1; then + git -C "${fund_dir}" init >/dev/null 2>&1 || fail "Failed to initialize git repository at ${fund_dir}." + fi + fi + + git -C "${fund_dir}" add config.json fills.json cash_events.json decisions >/dev/null 2>&1 || true + git -C "${fund_dir}" commit -m "Initialize fund repository" >/dev/null 2>&1 || true + + if [[ -n "${origin_url}" ]]; then + if git -C "${fund_dir}" remote get-url origin >/dev/null 2>&1; then + git -C "${fund_dir}" remote set-url origin "${origin_url}" >/dev/null 2>&1 || true + else + git -C "${fund_dir}" remote add origin "${origin_url}" >/dev/null 2>&1 || true + fi + if ! git -C "${fund_dir}" push -u origin HEAD >/dev/null 2>&1; then + warn "Initial push to origin failed. Setup will continue; auto-sync pushes may fail until auth/remote is fixed." + fi + fi +} + +configure_fund_repository() { + if ! has_prompt_tty; then + warn "No interactive terminal input available. Skipping fund repository setup." + return 0 + fi + + if ! command -v python3 >/dev/null 2>&1; then + fail "python3 is required for fund repository setup." + fi + + tty_printf "%b%s%b\n" "${BOLD}" "Fund Observability Repository" "${RESET}" + + local current_fund_dir="" + current_fund_dir="$(trim_input "$(read_observability_config_value "fund_dir" || true)")" + + local fund_dir_input="" + read_line_input "Fund directory path" "${current_fund_dir}" fund_dir_input + fund_dir_input="$(trim_input "${fund_dir_input}")" + while [[ -z "${fund_dir_input}" ]]; do + tty_printf "%b%s%b\n" "${YELLOW}" "Fund directory path is required." "${RESET}" + read_line_input "Fund directory path" "${current_fund_dir}" fund_dir_input + fund_dir_input="$(trim_input "${fund_dir_input}")" + done + + local fund_dir="" + fund_dir="$(expand_path_value "${fund_dir_input}")" + save_observability_fund_dir "${fund_dir}" + + if [[ -e "${fund_dir}" && ! -d "${fund_dir}" ]]; then + fail "Fund path exists but is not a directory: ${fund_dir}" + fi + + if [[ -d "${fund_dir}" ]]; then + tty_printf "%b✔%b %s\n" "${GREEN}" "${RESET}" "Using existing fund directory ${fund_dir}; skipping initialization." + return 0 + fi + + local fund_name="" + read_line_input "Fund name" "" fund_name + fund_name="$(trim_input "${fund_name}")" + while [[ -z "${fund_name}" ]]; do + tty_printf "%b%s%b\n" "${YELLOW}" "Fund name is required." "${RESET}" + read_line_input "Fund name" "" fund_name + fund_name="$(trim_input "${fund_name}")" + done + + local fund_slug="" + read_line_input "Fund slug" "" fund_slug + fund_slug="$(trim_input "${fund_slug}")" + while [[ -z "${fund_slug}" ]]; do + tty_printf "%b%s%b\n" "${YELLOW}" "Fund slug is required." "${RESET}" + read_line_input "Fund slug" "" fund_slug + fund_slug="$(trim_input "${fund_slug}")" + done + + local origin_git="" + read_line_input "Git origin URL" "" origin_git + origin_git="$(trim_input "${origin_git}")" + while [[ -z "${origin_git}" ]]; do + tty_printf "%b%s%b\n" "${YELLOW}" "Git origin URL is required." "${RESET}" + read_line_input "Git origin URL" "" origin_git + origin_git="$(trim_input "${origin_git}")" + done + + local initial_capital="" + if ! initial_capital="$(fetch_initial_capital_from_provider 2>/dev/null)"; then + fail "Could not fetch initial capital from provider. Ensure broker credentials/connectivity are valid, then rerun broker setup." + fi + initial_capital="$(trim_input "${initial_capital}")" + if [[ -z "${initial_capital}" ]]; then + fail "Provider returned an empty initial capital value." + fi + + local inception_timestamp="" + inception_timestamp="$(date -u +"%Y-%m-%dT%H:%M:%SZ")" + + initialize_fund_repo_files "${fund_dir}" "${fund_name}" "${fund_slug}" "${inception_timestamp}" "${initial_capital}" + initialize_fund_repo_git "${fund_dir}" "${origin_git}" + + tty_printf "%b✔%b %s\n" "${GREEN}" "${RESET}" "Initialized fund repository at ${fund_dir}" +} + save_selected_provider() { local provider="$1" diff --git a/install/steps/summary.sh b/install/steps/summary.sh index e62fbea..496c5f8 100644 --- a/install/steps/summary.sh +++ b/install/steps/summary.sh @@ -41,9 +41,42 @@ PY printf '%s\n' "${provider}" } +resolve_summary_fund_dir() { + if [[ ! -f "${BROKER_CONFIG_JSON}" ]] || ! command -v python3 >/dev/null 2>&1; then + return 0 + fi + python3 - "${BROKER_CONFIG_JSON}" <<'PY' +import json +import sys +from pathlib import Path + +config_path = Path(sys.argv[1]).expanduser() +if not config_path.exists(): + sys.exit(0) +try: + loaded = json.loads(config_path.read_text(encoding="utf-8")) +except Exception: + sys.exit(0) + +if not isinstance(loaded, dict): + sys.exit(0) +broker_cfg = loaded.get("broker") +if not isinstance(broker_cfg, dict): + sys.exit(0) +observability = broker_cfg.get("observability") +if not isinstance(observability, dict): + sys.exit(0) +fund_dir = observability.get("fund_dir") +if isinstance(fund_dir, str) and fund_dir.strip(): + print(fund_dir.strip()) +PY +} + print_summary() { local provider provider="$(resolve_summary_provider)" + local fund_dir + fund_dir="$(resolve_summary_fund_dir || true)" if [[ "${provider}" == "etrade" ]]; then cat < dict[str, Any]: params: dict[str, Any] = { "side": side, @@ -234,6 +237,12 @@ async def order( params["idempotency_key"] = idempotency_key if dry_run: params["dry_run"] = True + if decision_name: + params["decision_name"] = decision_name + if decision_summary: + params["decision_summary"] = decision_summary + if decision_reasoning: + params["decision_reasoning"] = decision_reasoning return await self._request("order.place", params) async def bracket( @@ -246,19 +255,26 @@ async def bracket( tp: float, sl: float, tif: TimeInForce = "DAY", + decision_name: str | None = None, + decision_summary: str | None = None, + decision_reasoning: str | None = None, ) -> dict[str, Any]: - return await self._request( - "order.bracket", - { - "side": side, - "symbol": symbol, - "qty": qty, - "entry": entry, - "tp": tp, - "sl": sl, - "tif": tif, - }, - ) + params: dict[str, Any] = { + "side": side, + "symbol": symbol, + "qty": qty, + "entry": entry, + "tp": tp, + "sl": sl, + "tif": tif, + } + if decision_name: + params["decision_name"] = decision_name + if decision_summary: + params["decision_summary"] = decision_summary + if decision_reasoning: + params["decision_reasoning"] = decision_reasoning + return await self._request("order.bracket", params) async def order_status(self, order_id: str) -> dict[str, Any]: return await self._request("order.status", {"order_id": order_id}) diff --git a/sdk/typescript/src/client.ts b/sdk/typescript/src/client.ts index 2482359..463bb2f 100644 --- a/sdk/typescript/src/client.ts +++ b/sdk/typescript/src/client.ts @@ -264,11 +264,20 @@ export class Client { if (input.dry_run !== undefined) { params.dry_run = input.dry_run; } + if (input.decision_name) { + params.decision_name = input.decision_name; + } + if (input.decision_summary) { + params.decision_summary = input.decision_summary; + } + if (input.decision_reasoning) { + params.decision_reasoning = input.decision_reasoning; + } return this.request("order.place", params); } async bracket(input: BracketInput): Promise { - return this.request("order.bracket", { + const params: CommandParams<"order.bracket"> = { side: input.side, symbol: input.symbol, qty: input.qty, @@ -276,7 +285,17 @@ export class Client { tp: input.tp, sl: input.sl, tif: input.tif ?? "DAY" - }); + }; + if (input.decision_name) { + params.decision_name = input.decision_name; + } + if (input.decision_summary) { + params.decision_summary = input.decision_summary; + } + if (input.decision_reasoning) { + params.decision_reasoning = input.decision_reasoning; + } + return this.request("order.bracket", params); } async orderStatus(orderId: string): Promise { diff --git a/sdk/typescript/src/commands.ts b/sdk/typescript/src/commands.ts index 8ea655b..42db551 100644 --- a/sdk/typescript/src/commands.ts +++ b/sdk/typescript/src/commands.ts @@ -93,6 +93,9 @@ export interface CommandMap { client_order_id?: string; idempotency_key?: string; dry_run?: boolean; + decision_name?: string; + decision_summary?: string; + decision_reasoning?: string; }, OrderPlaceResponse >; @@ -105,6 +108,9 @@ export interface CommandMap { tp: number; sl: number; tif?: TimeInForce; + decision_name?: string; + decision_summary?: string; + decision_reasoning?: string; }, OrderBracketResponse >; diff --git a/sdk/typescript/src/sdk-types.ts b/sdk/typescript/src/sdk-types.ts index bf378d0..8f16dad 100644 --- a/sdk/typescript/src/sdk-types.ts +++ b/sdk/typescript/src/sdk-types.ts @@ -183,10 +183,12 @@ export interface FillRecord { client_order_id: string; ib_order_id: number | null; symbol: string; + side?: OrderSide | null; qty: number; price: number; commission?: number | null; timestamp: string; + decision_id?: string | null; } export interface RiskCheckResult { @@ -419,6 +421,9 @@ export interface OrderInput { client_order_id?: string; idempotency_key?: string; dry_run?: boolean; + decision_name?: string; + decision_summary?: string; + decision_reasoning?: string; } export interface BracketInput { @@ -429,6 +434,9 @@ export interface BracketInput { tp: number; sl: number; tif?: TimeInForce; + decision_name?: string; + decision_summary?: string; + decision_reasoning?: string; } export interface RiskCheckInput { diff --git a/setup.sh b/setup.sh index 2338e37..90ac48b 100755 --- a/setup.sh +++ b/setup.sh @@ -105,6 +105,9 @@ else run_etrade_oauth_flow fi +echo "" +configure_fund_repository + # ─── Done ───────────────────────────────────────────────────────────────────── echo "" diff --git a/skills/SKILL.md b/skills/broker/SKILL.md similarity index 85% rename from skills/SKILL.md rename to skills/broker/SKILL.md index 886e5a9..49b557e 100644 --- a/skills/SKILL.md +++ b/skills/broker/SKILL.md @@ -12,11 +12,13 @@ Use this guide as an expanded Markdown help menu for `broker`. - Run help on any command with `-h` or `--help`. - Expect JSON output from commands. - Use command typo suggestions from the CLI if you mistype a command name. +- `broker setup` now configures provider credentials and fund observability repo wiring. ## Top-Level Command Map ```text broker daemon ... # daemon lifecycle +broker setup ... # provider + fund observability setup broker quote ... # quote snapshot broker watch ... # streaming quote refresh broker chain ... # option chain @@ -38,6 +40,18 @@ broker override ... # temporary risk override broker audit ... # grouped audit commands ``` +## Setup Command + +### `broker setup` + +Interactive setup for broker provider + fund observability repo bootstrap. + +- Prompts for fund directory. +- If the directory already exists, setup reuses it and skips fund bootstrap prompts. +- For new directories, setup prompts for fund name, fund slug, and git origin URL. +- Initial capital is fetched from the configured broker provider. +- Setup initializes `config.json`, `fills.json`, `cash_events.json`, `decisions/`, and a git repo with `origin`. + ## Daemon Commands ### `broker daemon start` @@ -128,34 +142,46 @@ broker history SYMBOL --period PERIOD --bar BAR [--rth-only] Place buy order. Market by default unless `--limit` and/or `--stop` is set. ```bash -broker order buy SYMBOL QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] +broker order buy SYMBOL QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] \ + --decision-name "Title Case Decision" \ + --decision-summary "One-line plain text summary" \ + --decision-reasoning "## Markdown reasoning..." ``` - `QTY` must be `> 0`. - Default `--tif`: `DAY` +- Decision flags are required for submitted orders. ### `broker order sell` Place sell order. Market by default unless `--limit` and/or `--stop` is set. ```bash -broker order sell SYMBOL QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] +broker order sell SYMBOL QTY [--limit PRICE] [--stop PRICE] [--tif DAY|GTC|IOC] \ + --decision-name "Title Case Decision" \ + --decision-summary "One-line plain text summary" \ + --decision-reasoning "## Markdown reasoning..." ``` - `QTY` must be `> 0`. - Default `--tif`: `DAY` +- Decision flags are required for submitted orders. ### `broker order bracket` Place bracket order (entry + take-profit + stop-loss). ```bash -broker order bracket SYMBOL QTY --entry PRICE --tp PRICE --sl PRICE [--side buy|sell] [--tif DAY|GTC|IOC] +broker order bracket SYMBOL QTY --entry PRICE --tp PRICE --sl PRICE [--side buy|sell] [--tif DAY|GTC|IOC] \ + --decision-name "Title Case Decision" \ + --decision-summary "One-line plain text summary" \ + --decision-reasoning "## Markdown reasoning..." ``` - Required: `--entry`, `--tp`, `--sl` - Default `--side`: `buy` - Default `--tif`: `DAY` +- Decision flags are required. ### `broker order status`