diff --git a/daemon/src/broker_daemon/daemon/server.py b/daemon/src/broker_daemon/daemon/server.py index 1a06182..00f7046 100644 --- a/daemon/src/broker_daemon/daemon/server.py +++ b/daemon/src/broker_daemon/daemon/server.py @@ -83,7 +83,7 @@ def __init__(self, cfg: AppConfig) -> None: else: self._provider = IBProvider(cfg.gateway, audit=self._audit, event_cb=self._on_broker_event) - self._fund_sync = FundSyncService(cfg.observability, provider=self._provider) + self._fund_sync = FundSyncService(cfg.observability) self._market_data = MarketDataService(self._provider, settings=cfg.market_data) self._orders = OrderManager( provider=self._provider, diff --git a/daemon/src/broker_daemon/observability/fund_sync.py b/daemon/src/broker_daemon/observability/fund_sync.py index b44aff3..1c7a581 100644 --- a/daemon/src/broker_daemon/observability/fund_sync.py +++ b/daemon/src/broker_daemon/observability/fund_sync.py @@ -12,13 +12,10 @@ from broker_daemon.config import ObservabilityConfig from broker_daemon.models.orders import FillRecord, Side -from broker_daemon.providers.base import BrokerProvider logger = logging.getLogger(__name__) -FUND_CONFIG = "config.json" FUND_FILLS = "fills.json" -FUND_CASH_EVENTS = "cash_events.json" FUND_DECISIONS_DIR = "decisions" @@ -34,13 +31,6 @@ def _decision_timestamp_id(ts: datetime | None = None) -> str: return (ts or _utc_now()).strftime("%Y%m%dT%H%M%S%fZ") -def _safe_float(value: Any, *, default: float = 0.0) -> float: - try: - return float(value) - except Exception: - return default - - def _yaml_quoted(value: str) -> str: # JSON quoted strings are valid YAML scalar strings. return json.dumps(value) @@ -49,9 +39,8 @@ def _yaml_quoted(value: str) -> str: class FundSyncService: """Handles append-only observability file writes and git push behavior.""" - def __init__(self, cfg: ObservabilityConfig, *, provider: BrokerProvider | None = None) -> None: + def __init__(self, cfg: ObservabilityConfig) -> None: self._cfg = cfg - self._provider = provider self._fund_dir = cfg.fund_dir self._lock = asyncio.Lock() @@ -123,13 +112,9 @@ async def sync_fill(self, fill: FillRecord) -> None: ) self._write_json_atomic(fills_path, rows) - changed_paths = [fills_path] - if await self._sync_interest_from_balance_locked(): - changed_paths.append(self._fund_dir / FUND_CASH_EVENTS) - await self._commit_and_push( message=f"fill: {fill.fill_id}", - changed_paths=changed_paths, + changed_paths=[fills_path], ) except Exception: logger.exception("fund sync: failed to sync fill %s", fill.fill_id) @@ -139,7 +124,6 @@ def _ensure_repo_layout(self) -> None: self._fund_dir.mkdir(parents=True, exist_ok=True) (self._fund_dir / FUND_DECISIONS_DIR).mkdir(parents=True, exist_ok=True) self._ensure_json_array_file(self._fund_dir / FUND_FILLS) - self._ensure_json_array_file(self._fund_dir / FUND_CASH_EVENTS) def _ensure_json_array_file(self, path: Path) -> None: if path.exists(): @@ -157,17 +141,6 @@ def _read_json_array(self, path: Path) -> list[dict[str, Any]]: return [] return [row for row in loaded if isinstance(row, dict)] - def _read_json_object(self, path: Path) -> dict[str, Any]: - if not path.exists(): - return {} - try: - loaded = json.loads(path.read_text(encoding="utf-8")) - except Exception: - return {} - if not isinstance(loaded, dict): - return {} - return loaded - def _write_if_changed(self, path: Path, content: str) -> bool: existing = "" if path.exists(): @@ -215,63 +188,6 @@ def _normalize_side(self, side: Side | None) -> str: return "sell" return "buy" - async def _sync_interest_from_balance_locked(self) -> bool: - if self._provider is None: - return False - assert self._fund_dir is not None - - try: - balance = await self._provider.balance() - except Exception: - return False - - if balance.cash is None: - return False - - config_path = self._fund_dir / FUND_CONFIG - config = self._read_json_object(config_path) - initial_capital = _safe_float(config.get("initialCapital"), default=0.0) - - fills = self._read_json_array(self._fund_dir / FUND_FILLS) - trade_cash_delta = 0.0 - for row in fills: - qty = _safe_float(row.get("qty")) - price = _safe_float(row.get("price")) - commission = _safe_float(row.get("commission")) - side = str(row.get("side", "")).strip().lower() - if side == "sell": - trade_cash_delta += qty * price - else: - trade_cash_delta -= qty * price - trade_cash_delta -= commission - - expected_cash_without_interest = initial_capital + trade_cash_delta - inferred_total_interest = float(balance.cash) - expected_cash_without_interest - - cash_events_path = self._fund_dir / FUND_CASH_EVENTS - cash_events = self._read_json_array(cash_events_path) - known_interest = sum( - _safe_float(event.get("amount")) - for event in cash_events - if str(event.get("type", "")).strip().lower() == "interest" - ) - delta_interest = round(inferred_total_interest - known_interest, 2) - if abs(delta_interest) < 0.01: - return False - - ts = _utc_now() - cash_events.append( - { - "id": f"interest-{_decision_timestamp_id(ts)}", - "type": "interest", - "amount": delta_interest, - "timestamp": _iso_utc(ts), - "source": "inferred_from_broker_cash_balance", - } - ) - self._write_json_atomic(cash_events_path, cash_events) - return True - async def _commit_and_push(self, *, message: str, changed_paths: list[Path]) -> None: if not changed_paths: return diff --git a/daemon/tests/test_daemon/test_fund_sync.py b/daemon/tests/test_daemon/test_fund_sync.py index 8b06916..d0900a9 100644 --- a/daemon/tests/test_daemon/test_fund_sync.py +++ b/daemon/tests/test_daemon/test_fund_sync.py @@ -7,15 +7,9 @@ from broker_daemon.config import ObservabilityConfig from broker_daemon.models.orders import FillRecord, Side -from broker_daemon.models.portfolio import Balance from broker_daemon.observability.fund_sync import FundSyncService -class _FakeProvider: - async def balance(self) -> Balance: - return Balance(cash=1_000.0, net_liquidation=1_000.0) - - @pytest.mark.asyncio async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> None: fund_dir = tmp_path / "fund-atlas" @@ -41,7 +35,6 @@ async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> N auto_sync=True, auto_push=False, ), - provider=_FakeProvider(), ) await sync.sync_decision( @@ -77,10 +70,6 @@ async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> N assert fills[0]["side"] == "buy" assert fills[0]["decisionId"] == "20260220T120000000000Z" - cash_events = json.loads((fund_dir / "cash_events.json").read_text(encoding="utf-8")) - assert cash_events - assert cash_events[0]["type"] == "interest" - @pytest.mark.asyncio async def test_sync_fill_deduplicates_by_fill_id(tmp_path: Path) -> None: @@ -97,7 +86,6 @@ async def test_sync_fill_deduplicates_by_fill_id(tmp_path: Path) -> None: auto_sync=True, auto_push=False, ), - provider=_FakeProvider(), ) fill = FillRecord(