Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 58 additions & 39 deletions strategies/market_making_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,20 @@ def run(self, coins: List[str]) -> None:
# No position — normal MM flow
close_oid = self._closer.get_close_oid(coin)

# Compute ideal prices once per cycle and reuse for both the
# tolerance refresh decision and order placement. This also
# ensures the volatility rolling buffer is updated exactly
# once per cycle (inside ``_compute_ideal_prices``), so the
# tolerance check and the placed order are evaluated against
# the same buffer state.
ideal = self._compute_ideal_prices(coin)

if getattr(self, 'refresh_tolerance_bp', 0) > 0:
ideal = self._compute_ideal_prices(coin)
if ideal is None:
# Fall back to age-only when ideal price is unavailable
self._tracker.cancel_stale_orders(coin, close_oid=close_oid)
else:
ideal_buy, ideal_sell = ideal
ideal_buy, ideal_sell, _ = ideal
self._tracker.refresh_orders_with_tolerance(
coin,
ideal_prices={
Expand Down Expand Up @@ -461,7 +468,7 @@ def run(self, coins: List[str]) -> None:
logger.info(f"[mm] {coin} cooldown expired, resuming")

if self._tracker.get_order_count(coin) < self.max_open_orders:
self._place_orders(coin)
self._place_orders(coin, ideal_prices=ideal)

except API_ERRORS as e:
logger.error(f"[mm] Error processing {coin}: {e}")
Expand Down Expand Up @@ -644,14 +651,18 @@ def _get_dynamic_position_age(self, coin: str) -> Optional[float]:

return age

def _compute_ideal_prices(self, coin: str) -> Optional[Tuple[float, float]]:
"""Compute current ideal ``(buy_price, sell_price)`` for ``coin``.
def _compute_ideal_prices(self, coin: str) -> Optional[Tuple[float, float, float]]:
"""Compute current ideal ``(buy_price, sell_price, skew_bps)`` for ``coin``.

Mirrors the price computation that was previously inlined in
:meth:`_place_orders`. Updates the volatility rolling buffer via
:meth:`_record_mid_price` so callers can rely on this method as the
single source of truth for both order placement and refresh-tolerance
drift evaluation. Returns ``None`` when the ideal price cannot be
determined (no market data, mid <= 0).

Mirrors the price computation in :meth:`_place_orders` but is a
pure helper (no side effects on rolling buffers, no order placement).
Used by the run loop to evaluate refresh-tolerance drift before
deciding whether to keep existing orders. Returns ``None`` when the
ideal price cannot be determined (no market data, mid <= 0).
The ``skew_bps`` value is returned alongside the prices so callers can
log it without recomputing :meth:`_calculate_inventory_skew`.
"""
market_data = self.market_data.get_market_data(coin)
if not market_data:
Expand All @@ -664,6 +675,10 @@ def _compute_ideal_prices(self, coin: str) -> Optional[Tuple[float, float]]:
rp = self.market_data.price_rounding_params(coin)

if self.bbo_mode and market_data.bid > 0 and market_data.ask > 0:
# Update the rolling vol buffer here so the volatility-adjusted
# offset below sees the freshest sample in both call paths
# (run-loop tolerance check and order placement).
self._record_mid_price(coin, mid_price)
base_offset = self._get_coin_offset(coin)
if self.vol_adjust_enabled:
effective_offset_bps = self._get_volatility_adjusted_offset(coin, base_offset)
Expand Down Expand Up @@ -708,41 +723,43 @@ def _compute_ideal_prices(self, coin: str) -> Optional[Tuple[float, float]]:
buy_price = round_price(buy_price * (1 - skew_mult), *rp)
sell_price = round_price(sell_price * (1 - skew_mult), *rp)

return buy_price, sell_price
return buy_price, sell_price, skew

def _place_orders(self, coin: str) -> None:
def _place_orders(
self,
coin: str,
ideal_prices: Optional[Tuple[float, float, float]] = None,
) -> None:
"""Place a buy and a sell limit order.

In BBO mode, orders are placed at or near the best bid/ask.
Otherwise, orders are placed symmetrically around mid price at
``spread_bps``. Uses ``bulk_place_orders`` for a single API call.

``ideal_prices`` is the pre-computed ``(buy, sell, skew_bps)`` tuple
produced by :meth:`_compute_ideal_prices` earlier in the same cycle.
When ``None`` (legacy callers / tests that bypass the run loop), the
prices are computed inline. Threading the cached tuple in from the
run loop avoids a redundant compute and double-update of the
volatility buffer.
"""
from order_manager import Order

if coin in self._closer.tracked_coins:
return

market_data = self.market_data.get_market_data(coin)
if not market_data:
logger.warning(f"[mm] No market data for {coin}, skipping")
return

mid_price = market_data.mid_price
if mid_price <= 0:
return

# Volatility buffer is updated here so it occurs once per placement
# cycle even when ``_compute_ideal_prices`` is also called from the
# run loop (which is a pure helper).
if self.bbo_mode and market_data.bid > 0 and market_data.ask > 0:
self._record_mid_price(coin, mid_price)

prices = self._compute_ideal_prices(coin)
if prices is None:
return
buy_price, sell_price = prices
if ideal_prices is None:
prices = self._compute_ideal_prices(coin)
if prices is None:
# Distinguish "no market data" so the warning matches the
# pre-refactor log shape used by tests/operators.
if not self.market_data.get_market_data(coin):
logger.warning(f"[mm] No market data for {coin}, skipping")
return
else:
prices = ideal_prices

skew = self._calculate_inventory_skew(coin, mid_price)
buy_price, sell_price, skew = prices
if skew != 0.0:
logger.debug(f"[mm] Inventory skew {coin}: {skew:.1f}bps")

Expand Down Expand Up @@ -771,13 +788,15 @@ def _place_orders(self, coin: str) -> None:
skip_buy = False
skip_sell = False
if self.imbalance_threshold > 0:
imb = market_data.book_imbalance
if imb < -self.imbalance_threshold:
skip_buy = True
logger.debug(f"[mm] {coin} skipping BUY (book imbalance {imb:.2f})")
elif imb > self.imbalance_threshold:
skip_sell = True
logger.debug(f"[mm] {coin} skipping SELL (book imbalance {imb:.2f})")
market_data = self.market_data.get_market_data(coin)
if market_data is not None:
imb = market_data.book_imbalance
if imb < -self.imbalance_threshold:
skip_buy = True
logger.debug(f"[mm] {coin} skipping BUY (book imbalance {imb:.2f})")
elif imb > self.imbalance_threshold:
skip_sell = True
logger.debug(f"[mm] {coin} skipping SELL (book imbalance {imb:.2f})")

if (
current_count < self.max_open_orders
Expand Down
136 changes: 129 additions & 7 deletions tests/test_refresh_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@ def test_spread_mode_symmetric_around_mid(self):

result = s._compute_ideal_prices("BTC")
assert result is not None
buy, sell = result
buy, sell, skew = result
# 10bp around 100.0 = 99.9 / 100.1
assert abs(buy - 99.9) < 1e-6
assert abs(sell - 100.1) < 1e-6
# No position -> no inventory skew.
assert skew == 0.0

def test_bbo_mode_at_best_bid_ask(self):
s, _om, md, _ = _make_strategy()
Expand All @@ -136,12 +138,16 @@ def test_bbo_mode_at_best_bid_ask(self):
s._get_coin_offset = lambda coin: 0.0
s._calculate_microprice_offsets = lambda coin, off: (off, off)
s._calculate_inventory_skew = lambda coin, mid: 0.0
# Vol buffer is updated inside _compute_ideal_prices in BBO mode;
# stub it out to keep this test focused on price math.
s._record_mid_price = lambda coin, mid: None

result = s._compute_ideal_prices("BTC")
assert result is not None
buy, sell = result
buy, sell, skew = result
assert abs(buy - 99.95) < 1e-6
assert abs(sell - 100.05) < 1e-6
assert skew == 0.0


class TestRunLoopTolerancePath:
Expand All @@ -156,8 +162,8 @@ def test_disabled_uses_legacy_cancel_stale_orders(self):
# Simulate the run-loop dispatch directly (without the surrounding
# boilerplate of MarketMakingStrategy.run): tolerance is 0, so the
# legacy method should be chosen.
ideal = s._compute_ideal_prices("BTC")
if s.refresh_tolerance_bp > 0:
ideal = s._compute_ideal_prices("BTC")
tracker.refresh_orders_with_tolerance(
"BTC",
ideal_prices={"B": ideal[0], "A": ideal[1]},
Expand All @@ -173,15 +179,20 @@ def test_disabled_uses_legacy_cancel_stale_orders(self):

def test_enabled_uses_refresh_with_tolerance(self):
"""``refresh_tolerance_bp > 0`` -> ``refresh_orders_with_tolerance`` is invoked."""
from order_manager import OrderSide

s, _om, md, tracker = _make_strategy(refresh_tolerance_bp=2.0)
market_data = _market_data(mid=100.0, bid=99.99, ask=100.01)
md.get_market_data.return_value = market_data

ideal = s._compute_ideal_prices("BTC")
if s.refresh_tolerance_bp > 0:
ideal = s._compute_ideal_prices("BTC")
tracker.refresh_orders_with_tolerance(
"BTC",
ideal_prices={"B": ideal[0], "A": ideal[1]},
ideal_prices={
OrderSide.BUY.value: ideal[0],
OrderSide.SELL.value: ideal[1],
},
tolerance_bp=s.refresh_tolerance_bp,
max_age_seconds=s.refresh_max_age_seconds,
close_oid=None,
Expand All @@ -193,11 +204,122 @@ def test_enabled_uses_refresh_with_tolerance(self):
call_kwargs = tracker.refresh_orders_with_tolerance.call_args.kwargs
assert call_kwargs["tolerance_bp"] == 2.0
assert call_kwargs["max_age_seconds"] == 120.0
assert "B" in call_kwargs["ideal_prices"]
assert "A" in call_kwargs["ideal_prices"]
assert OrderSide.BUY.value in call_kwargs["ideal_prices"]
assert OrderSide.SELL.value in call_kwargs["ideal_prices"]
tracker.cancel_stale_orders.assert_not_called()


class TestSingleComputePerCycle:
"""The two should-fix items from PR #144 review:

1. ``_calculate_inventory_skew`` is computed once per cycle.
2. ``_record_mid_price`` (vol buffer update) fires once per cycle, so
the run-loop tolerance check and the actual placement evaluate
against the same buffer state.
"""

def _stub_for_place_orders(self, s):
s._calculate_microprice_offsets = lambda coin, off: (off, off)
s._get_coin_offset = lambda coin: 0.0
s._get_coin_spread = lambda coin: s.spread_bps
s._get_hourly_spread_multiplier = lambda: 1.0
s.calculate_position_size = lambda coin, signal: 1.0

def test_inventory_skew_called_once_when_threading_ideal_prices(self):
"""Threading ``ideal_prices`` into ``_place_orders`` skips the
redundant inventory-skew computation."""
s, _om, md, _tracker = _make_strategy(refresh_tolerance_bp=2.0)
self._stub_for_place_orders(s)
market_data = _market_data(mid=100.0, bid=99.99, ask=100.01)
md.get_market_data.return_value = market_data
md.round_size.return_value = 1.0
s.order_manager.bulk_place_orders.return_value = []

skew_calls = {"n": 0}

def counting_skew(_coin, _mid):
skew_calls["n"] += 1
return 0.0

s._calculate_inventory_skew = counting_skew
s._record_mid_price = lambda coin, mid: None # not BBO mode here

# Cycle simulation: run-loop computes once, then passes to placement.
ideal = s._compute_ideal_prices("BTC")
assert skew_calls["n"] == 1
s._place_orders("BTC", ideal_prices=ideal)
# _place_orders must not recompute the skew.
assert skew_calls["n"] == 1

def test_inventory_skew_recomputed_when_ideal_prices_omitted(self):
"""Legacy callers that omit ``ideal_prices`` still get a working
path that computes prices internally (one skew call total)."""
s, _om, md, _tracker = _make_strategy(refresh_tolerance_bp=0.0)
self._stub_for_place_orders(s)
market_data = _market_data(mid=100.0, bid=99.99, ask=100.01)
md.get_market_data.return_value = market_data
md.round_size.return_value = 1.0
s.order_manager.bulk_place_orders.return_value = []

skew_calls = {"n": 0}

def counting_skew(_coin, _mid):
skew_calls["n"] += 1
return 0.0

s._calculate_inventory_skew = counting_skew
s._record_mid_price = lambda coin, mid: None

# No ``ideal_prices`` kwarg -> internal compute path.
s._place_orders("BTC")
assert skew_calls["n"] == 1

def test_record_mid_price_fires_once_per_cycle(self):
"""In BBO mode, the volatility buffer is updated exactly once per
cycle even when ``_compute_ideal_prices`` is invoked from the
run-loop tolerance check and the result is threaded into
``_place_orders``."""
s, _om, md, _tracker = _make_strategy(refresh_tolerance_bp=2.0)
self._stub_for_place_orders(s)
s.bbo_mode = True
market_data = _market_data(mid=100.0, bid=99.95, ask=100.05)
md.get_market_data.return_value = market_data
md.round_size.return_value = 1.0
s._calculate_inventory_skew = lambda coin, mid: 0.0
s.order_manager.bulk_place_orders.return_value = []

record_calls = []
s._record_mid_price = lambda coin, mid: record_calls.append((coin, mid))

# Cycle simulation.
ideal = s._compute_ideal_prices("BTC")
s._place_orders("BTC", ideal_prices=ideal)

# Buffer updated exactly once -- inside _compute_ideal_prices.
assert len(record_calls) == 1
assert record_calls[0] == ("BTC", 100.0)

def test_record_mid_price_skipped_when_not_bbo_mode(self):
"""Non-BBO mode never updates the rolling volatility buffer
(preserves pre-PR behaviour)."""
s, _om, md, _tracker = _make_strategy(refresh_tolerance_bp=2.0)
self._stub_for_place_orders(s)
s.bbo_mode = False
market_data = _market_data(mid=100.0, bid=99.99, ask=100.01)
md.get_market_data.return_value = market_data
md.round_size.return_value = 1.0
s._calculate_inventory_skew = lambda coin, mid: 0.0
s.order_manager.bulk_place_orders.return_value = []

record_calls = []
s._record_mid_price = lambda coin, mid: record_calls.append((coin, mid))

ideal = s._compute_ideal_prices("BTC")
s._place_orders("BTC", ideal_prices=ideal)

assert record_calls == []


class TestPlaceOrdersOpenSidesGating:
"""``_place_orders`` skips a side that already has a kept tracked order."""

Expand Down
Loading