Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/src/broker_daemon/daemon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self, cfg: AppConfig) -> None:
else:
self._provider = IBProvider(cfg.gateway, audit=self._audit, event_cb=self._on_broker_event)

self._fund_sync = FundSyncService(cfg.observability, provider=self._provider)
self._fund_sync = FundSyncService(cfg.observability)
self._market_data = MarketDataService(self._provider, settings=cfg.market_data)
self._orders = OrderManager(
provider=self._provider,
Expand Down
88 changes: 2 additions & 86 deletions daemon/src/broker_daemon/observability/fund_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@

from broker_daemon.config import ObservabilityConfig
from broker_daemon.models.orders import FillRecord, Side
from broker_daemon.providers.base import BrokerProvider

logger = logging.getLogger(__name__)

FUND_CONFIG = "config.json"
FUND_FILLS = "fills.json"
FUND_CASH_EVENTS = "cash_events.json"
FUND_DECISIONS_DIR = "decisions"


Expand All @@ -34,13 +31,6 @@ def _decision_timestamp_id(ts: datetime | None = None) -> str:
return (ts or _utc_now()).strftime("%Y%m%dT%H%M%S%fZ")


def _safe_float(value: Any, *, default: float = 0.0) -> float:
try:
return float(value)
except Exception:
return default


def _yaml_quoted(value: str) -> str:
# JSON quoted strings are valid YAML scalar strings.
return json.dumps(value)
Expand All @@ -49,9 +39,8 @@ def _yaml_quoted(value: str) -> str:
class FundSyncService:
"""Handles append-only observability file writes and git push behavior."""

def __init__(self, cfg: ObservabilityConfig, *, provider: BrokerProvider | None = None) -> None:
def __init__(self, cfg: ObservabilityConfig) -> None:
self._cfg = cfg
self._provider = provider
self._fund_dir = cfg.fund_dir
self._lock = asyncio.Lock()

Expand Down Expand Up @@ -123,13 +112,9 @@ async def sync_fill(self, fill: FillRecord) -> None:
)
self._write_json_atomic(fills_path, rows)

changed_paths = [fills_path]
if await self._sync_interest_from_balance_locked():
changed_paths.append(self._fund_dir / FUND_CASH_EVENTS)

await self._commit_and_push(
message=f"fill: {fill.fill_id}",
changed_paths=changed_paths,
changed_paths=[fills_path],
)
except Exception:
logger.exception("fund sync: failed to sync fill %s", fill.fill_id)
Expand All @@ -139,7 +124,6 @@ def _ensure_repo_layout(self) -> None:
self._fund_dir.mkdir(parents=True, exist_ok=True)
(self._fund_dir / FUND_DECISIONS_DIR).mkdir(parents=True, exist_ok=True)
self._ensure_json_array_file(self._fund_dir / FUND_FILLS)
self._ensure_json_array_file(self._fund_dir / FUND_CASH_EVENTS)

def _ensure_json_array_file(self, path: Path) -> None:
if path.exists():
Expand All @@ -157,17 +141,6 @@ def _read_json_array(self, path: Path) -> list[dict[str, Any]]:
return []
return [row for row in loaded if isinstance(row, dict)]

def _read_json_object(self, path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
loaded = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
if not isinstance(loaded, dict):
return {}
return loaded

def _write_if_changed(self, path: Path, content: str) -> bool:
existing = ""
if path.exists():
Expand Down Expand Up @@ -215,63 +188,6 @@ def _normalize_side(self, side: Side | None) -> str:
return "sell"
return "buy"

async def _sync_interest_from_balance_locked(self) -> bool:
if self._provider is None:
return False
assert self._fund_dir is not None

try:
balance = await self._provider.balance()
except Exception:
return False

if balance.cash is None:
return False

config_path = self._fund_dir / FUND_CONFIG
config = self._read_json_object(config_path)
initial_capital = _safe_float(config.get("initialCapital"), default=0.0)

fills = self._read_json_array(self._fund_dir / FUND_FILLS)
trade_cash_delta = 0.0
for row in fills:
qty = _safe_float(row.get("qty"))
price = _safe_float(row.get("price"))
commission = _safe_float(row.get("commission"))
side = str(row.get("side", "")).strip().lower()
if side == "sell":
trade_cash_delta += qty * price
else:
trade_cash_delta -= qty * price
trade_cash_delta -= commission

expected_cash_without_interest = initial_capital + trade_cash_delta
inferred_total_interest = float(balance.cash) - expected_cash_without_interest

cash_events_path = self._fund_dir / FUND_CASH_EVENTS
cash_events = self._read_json_array(cash_events_path)
known_interest = sum(
_safe_float(event.get("amount"))
for event in cash_events
if str(event.get("type", "")).strip().lower() == "interest"
)
delta_interest = round(inferred_total_interest - known_interest, 2)
if abs(delta_interest) < 0.01:
return False

ts = _utc_now()
cash_events.append(
{
"id": f"interest-{_decision_timestamp_id(ts)}",
"type": "interest",
"amount": delta_interest,
"timestamp": _iso_utc(ts),
"source": "inferred_from_broker_cash_balance",
}
)
self._write_json_atomic(cash_events_path, cash_events)
return True

async def _commit_and_push(self, *, message: str, changed_paths: list[Path]) -> None:
if not changed_paths:
return
Expand Down
12 changes: 0 additions & 12 deletions daemon/tests/test_daemon/test_fund_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,9 @@

from broker_daemon.config import ObservabilityConfig
from broker_daemon.models.orders import FillRecord, Side
from broker_daemon.models.portfolio import Balance
from broker_daemon.observability.fund_sync import FundSyncService


class _FakeProvider:
async def balance(self) -> Balance:
return Balance(cash=1_000.0, net_liquidation=1_000.0)


@pytest.mark.asyncio
async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> None:
fund_dir = tmp_path / "fund-atlas"
Expand All @@ -41,7 +35,6 @@ async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> N
auto_sync=True,
auto_push=False,
),
provider=_FakeProvider(),
)

await sync.sync_decision(
Expand Down Expand Up @@ -77,10 +70,6 @@ async def test_sync_decision_and_fill_writes_expected_files(tmp_path: Path) -> N
assert fills[0]["side"] == "buy"
assert fills[0]["decisionId"] == "20260220T120000000000Z"

cash_events = json.loads((fund_dir / "cash_events.json").read_text(encoding="utf-8"))
assert cash_events
assert cash_events[0]["type"] == "interest"


@pytest.mark.asyncio
async def test_sync_fill_deduplicates_by_fill_id(tmp_path: Path) -> None:
Expand All @@ -97,7 +86,6 @@ async def test_sync_fill_deduplicates_by_fill_id(tmp_path: Path) -> None:
auto_sync=True,
auto_push=False,
),
provider=_FakeProvider(),
)

fill = FillRecord(
Expand Down