diff --git a/README.md b/README.md index 4dd1f6a..4c0f62b 100644 --- a/README.md +++ b/README.md @@ -363,6 +363,8 @@ The `market_making` strategy uses **progressive close pricing**: as a position a **Adverse selection logging** (`--enable-adverse-selection-log`): Measures mid-price movement 5s/30s/60s after each fill, logging per-coin summaries every 300s. Observation only — no trading impact. +**Order rejection log aggregation** (`--rejection-log-level`, `--rejection-summary-interval`): Routine post-only rejections (`Post only order would have immediately matched`) are an expected retry signal under maker-only quoting, but they were historically logged at ERROR — drowning out genuine errors as MM size grows. The strategy now classifies each rejection by API error text and adds a single `[reject-summary]` INFO line every `--rejection-summary-interval` seconds (default 300, set to 0 to disable). The default per-rejection log level stays `error` and emits the same byte-identical `Order rejected: …` line as before, so existing log scrapers and ERROR-rate alerts keep working. Flipping `--rejection-log-level warning` (or `info`) opts into a richer `[reject:tag] coin — …` categorised format at the chosen level; unknown rejection text always falls through to ERROR with the legacy line so format changes / new reject reasons stay visible. + **Dynamic offset** (`--dynamic-offset`): Auto-adjusts per-coin BBO offset based on adverse selection severity from the tracker. Coins with higher adverse selection get wider offsets; favorable coins get tighter offsets. Requires `--enable-ws` and `--enable-adverse-selection-log`. Manual `--coin-offset-overrides` serve as the baseline; dynamic adjustment adds/subtracts from it. **Dynamic position age** (`--dynamic-age`): Adjusts `MAX_POSITION_AGE` per coin based on recent volatility. High-volatility coins get shorter holding times (reducing adverse selection risk), while low-volatility coins get longer times (improving maker fill probability). Uses the same mid-price history as `--vol-adjust`. Configure `--dynamic-age-baseline-vol` (bps) as the "normal" volatility reference, and `--dynamic-age-min` / `--dynamic-age-max` (seconds) for clamping bounds. Falls back to the fixed `--max-position-age` when data is insufficient. @@ -686,6 +688,8 @@ ws_guards: # All require --enable-ws velocity_min_move_bps: 1.0 # --velocity-min-move-bps (min cumulative move in bps to trigger) enable_adverse_selection_log: false # --enable-adverse-selection-log (post-fill mid tracking) adverse_selection_log_interval: 300 # --adverse-selection-log-interval (summary log interval in seconds) + rejection_log_level: "error" # --rejection-log-level (level for routine post-only rejections: error/warning/info/debug; default error preserves legacy behaviour) + rejection_summary_interval: 300 # --rejection-summary-interval (aggregate rejection summary cadence in seconds; 0 disables) config_merge_order: "default_configs[strategy] ← CLI overrides (only non-null)" priority: "CLI flag > env var > default_configs > strategy constructor fallback" diff --git a/bot.py b/bot.py index 50cd0d8..b9e1340 100644 --- a/bot.py +++ b/bot.py @@ -163,6 +163,8 @@ 'forager_weight_activity', 'forager_weight_quality', 'forager_weight_cost', + 'rejection_log_level', + 'rejection_summary_interval', 'drain_flag_file', ], } @@ -1253,6 +1255,15 @@ def stop(self): help='Enable post-fill adverse selection measurement logging') parser.add_argument('--adverse-selection-log-interval', type=float, help='Adverse selection summary log interval in seconds (default: 300)') + parser.add_argument('--rejection-log-level', type=str, + choices=['error', 'warning', 'info', 'debug'], + help='Log level for routine post-only order rejections ' + '(default: error — preserves legacy behaviour). ' + 'Set to "warning" to reduce ERROR noise once the ' + '5min summary line is trusted.') + parser.add_argument('--rejection-summary-interval', type=float, + help='Order rejection aggregate summary interval in seconds; ' + '0 disables the summary (default: 300)') parser.add_argument('--coin-offset-overrides', type=str, default='', help='Per-coin BBO offset overrides in bps (e.g. "SP500:0.5,MSFT:3")') parser.add_argument('--coin-spread-overrides', type=str, default='', diff --git a/order_manager.py b/order_manager.py index ebcc7e6..79a4e91 100644 --- a/order_manager.py +++ b/order_manager.py @@ -1,6 +1,6 @@ import logging from collections import defaultdict -from typing import Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional from dataclasses import dataclass from datetime import datetime from enum import Enum @@ -12,6 +12,9 @@ from coin_utils import is_hip3, parse_coin from ttl_cache import TTLCacheEntry, TTLCacheMap +if TYPE_CHECKING: + from order_rejection_tracker import OrderRejectionTracker + logger = logging.getLogger(__name__) @@ -90,6 +93,30 @@ def __init__(self, exchange: Exchange, info: Info, account_address: str, # Not thread-safe; bot runs single-threaded. self._open_orders_cache: TTLCacheEntry[List[Dict]] = TTLCacheEntry(user_state_cache_ttl) + # Optional rejection tracker: when set, routine post-only rejections + # are routed through it for log-level downgrade and aggregation. + # When ``None`` the legacy ERROR-level path is used (unchanged). + self._rejection_tracker: Optional["OrderRejectionTracker"] = None + + def set_rejection_tracker(self, tracker: "OrderRejectionTracker") -> None: + """Wire a tracker so future rejections are classified and aggregated. + + When unset, the legacy ERROR-level path is preserved exactly. + """ + self._rejection_tracker = tracker + + def _record_rejection(self, coin: str, error_msg: str, prefix: str = "Order rejected") -> None: + """Route a rejection through the tracker if registered, else log ERROR. + + The tracker handles its own logging at the configured level for + routine matches; for unknown patterns it forwards to ERROR. With + no tracker we preserve the historical ``logger.error`` line. + """ + if self._rejection_tracker is not None: + self._rejection_tracker.record(coin, error_msg) + else: + logger.error(f"{prefix}: {error_msg}") + def _invalidate_open_orders_cache(self) -> None: """Clear cached open orders after a write operation (place/cancel).""" self._open_orders_cache.invalidate() @@ -392,9 +419,7 @@ def _place_order(self, order: Order) -> Optional[Order]: return order if 'error' in status_info: - logger.error( - "Order rejected: %s", status_info['error'] - ) + self._record_rejection(order.coin, status_info['error']) order.status = OrderStatus.REJECTED return None @@ -495,9 +520,10 @@ def _bulk_place_orders_with_builder( break if 'error' in status_info: - logger.error( - "Bulk order [%d] rejected: %s", - i, status_info['error'], + self._record_rejection( + orders[i].coin, + status_info['error'], + prefix=f"Bulk order [{i}] rejected", ) orders[i].status = OrderStatus.REJECTED continue diff --git a/order_rejection_tracker.py b/order_rejection_tracker.py new file mode 100644 index 0000000..b92c559 --- /dev/null +++ b/order_rejection_tracker.py @@ -0,0 +1,258 @@ +"""Order rejection classification and aggregation. + +The bot's order placement path (``order_manager.py``) historically emits +an ERROR-level log for *every* rejection returned by the Hyperliquid +exchange, regardless of whether the rejection signals a routine retry +condition (e.g. post-only orders that would have crossed the BBO) or a +genuine error condition. Under live MM load the routine post-only +rejection accounts for nearly all ERROR records, drowning out real +anomalies in monitoring. + +This module reclassifies each rejection by matching the API error text +against a static set of known "routine" patterns. Matched rejections can +be downgraded to WARNING / INFO via config, and the tracker accumulates +per-coin counts that are flushed as a single summary line every +``rejection_summary_interval`` seconds. + +The tracker is *opt-in*: ``order_manager.py`` only invokes it when one +has been registered via ``set_rejection_tracker``. With no tracker the +legacy ERROR-level path is preserved exactly. + +Public surface: + +* :class:`OrderRejectionTracker` — main tracker. +* :func:`classify_rejection` — pure helper used by tests. + +Both are intentionally cheap on the order-placement hot path: ``record`` +is O(1) under a short-lived lock, and the periodic summary is driven +externally by the strategy's main loop (no background thread). +""" + +import logging +import re +import threading +import time +from collections import defaultdict +from dataclasses import dataclass +from typing import Dict, Optional, Tuple + +logger = logging.getLogger(__name__) + + +# Pattern matcher → tag. Patterns are evaluated in order; current entries +# are mutually exclusive on the API's error text format. Adding a new +# routine pattern (e.g. ``Reduce only``) is a one-line edit here plus a +# regression test in ``tests/test_order_rejection_tracker.py``. +_ROUTINE_PATTERNS: Tuple[Tuple["re.Pattern[str]", str], ...] = ( + (re.compile(r"Post only order would have immediately matched"), + "post_only_match"), +) + +# Mapping from log-level config string to logging level constant. Keys +# are normalised to lowercase before lookup so config values like +# ``"WARNING"`` continue to work. +_LEVEL_MAP: Dict[str, int] = { + "error": logging.ERROR, + "warning": logging.WARNING, + "info": logging.INFO, + "debug": logging.DEBUG, +} + +# Allowed config string values, exposed for validators. +ALLOWED_LOG_LEVELS = tuple(_LEVEL_MAP.keys()) + +# Tag used for unmatched rejection text. Always logged at ERROR so a +# format change on the exchange surfaces immediately. +UNKNOWN_TAG = "unknown" + + +def classify_rejection(msg: str) -> str: + """Return the routine tag for *msg*, or :data:`UNKNOWN_TAG` if none match. + + Pure function exposed for tests so the pattern map can be exercised + without instantiating a tracker. + """ + for pat, tag in _ROUTINE_PATTERNS: + if pat.search(msg): + return tag + return UNKNOWN_TAG + + +def _extract_bbo(msg: str) -> str: + """Best-effort extraction of the ``"bid@ask"`` snippet from a reject text. + + Returns an empty string when the pattern is absent (e.g. older HL + error messages or future format changes). + """ + m = re.search(r"bbo was ([\d.]+@[\d.]+)", msg) + return m.group(1) if m else "" + + +@dataclass +class _CoinStats: + count: int = 0 + first_ts: float = 0.0 + last_ts: float = 0.0 + last_bbo: str = "" + + +class OrderRejectionTracker: + """Classify and aggregate order-rejection events. + + Public methods are thread-safe. ``record`` is on the order-placement + hot path and intentionally O(1). + """ + + def __init__( + self, + routine_log_level: str = "error", + summary_interval: float = 300.0, + ) -> None: + self._level: int = _LEVEL_MAP.get( + (routine_log_level or "error").lower(), logging.ERROR + ) + self._summary_interval: float = float(summary_interval) + + # tag -> coin -> _CoinStats. Reset after each summary flush. + self._stats: Dict[str, Dict[str, _CoinStats]] = defaultdict( + lambda: defaultdict(_CoinStats) + ) + self._unknown_count: int = 0 + self._lock = threading.Lock() + self._last_summary_ts: float = time.monotonic() + + # ------------------------------------------------------------------ # + # Recording + # ------------------------------------------------------------------ # + def record(self, coin: str, raw_msg: str) -> str: + """Record a rejection. Returns the matched tag, or :data:`UNKNOWN_TAG`. + + Emits the per-rejection log at the configured level for routine + matches; unknown patterns are forwarded to ERROR so format + changes / new reject reasons remain visible. + + At the default ERROR level the line is byte-identical to the + legacy ``logger.error("Order rejected: …")`` produced by + ``order_manager.py`` so existing log scrapers / ERROR-rate alert + templates continue to match. The richer + ``[reject:tag] coin — …`` format is reserved for the opt-in + downgraded levels (warning / info / debug). + """ + tag = classify_rejection(raw_msg) + bbo = _extract_bbo(raw_msg) + now_wall = time.time() + + if tag == UNKNOWN_TAG: + with self._lock: + self._unknown_count += 1 + # Legacy ``Order rejected: `` format preserved so + # back-compat with log scrapers holds for unknowns too. + logger.error(f"Order rejected: {raw_msg}") + return tag + + with self._lock: + stats = self._stats[tag][coin] + if stats.count == 0: + stats.first_ts = now_wall + stats.count += 1 + stats.last_ts = now_wall + stats.last_bbo = bbo + + # Logger call deliberately outside the lock: the lock guards + # only the in-memory counters. + if self._level == logging.ERROR: + # Default deployment: emit the legacy line verbatim so back- + # compat with existing log scrapers / templated alerts holds. + logger.error(f"Order rejected: {raw_msg}") + else: + # Downgraded — operator has opted in, give them the richer + # categorised line that carries coin + tag for grep-ability. + logger.log(self._level, f"[reject:{tag}] {coin} — {raw_msg}") + return tag + + # ------------------------------------------------------------------ # + # Periodic summary + # ------------------------------------------------------------------ # + def log_summary_if_due(self, now_monotonic: Optional[float] = None) -> bool: + """Emit a summary line if the configured interval has elapsed. + + Returns ``True`` iff a summary line was actually emitted, ``False`` + when: + + * ``summary_interval <= 0`` (feature disabled), or + * the interval has not elapsed yet, or + * the interval elapsed but no rejections occurred (counters were + flushed silently — the strategy main loop discards the return + value, but tests / external observers can rely on this signal). + """ + if self._summary_interval <= 0: + return False + now = now_monotonic if now_monotonic is not None else time.monotonic() + if now - self._last_summary_ts < self._summary_interval: + return False + + with self._lock: + snapshot = self._stats + unknown = self._unknown_count + self._stats = defaultdict(lambda: defaultdict(_CoinStats)) + self._unknown_count = 0 + self._last_summary_ts = now + + return self._emit_summary(snapshot, unknown) + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + def _emit_summary( + self, + snapshot: Dict[str, Dict[str, _CoinStats]], + unknown: int, + ) -> bool: + """Emit summary lines and return ``True`` iff anything was logged. + + Returns ``False`` when both *snapshot* and *unknown* are empty — + the helper stays silent during idle periods rather than emitting + a meaningless ``total=0`` line every interval. + """ + any_emitted = False + for tag, by_coin in snapshot.items(): + if not by_coin: + continue + total = sum(s.count for s in by_coin.values()) + ranked = sorted( + by_coin.items(), key=lambda kv: kv[1].count, reverse=True + ) + top = ", ".join(f"{coin}={s.count}" for coin, s in ranked[:8]) + sample_bbo = ranked[0][1].last_bbo if ranked else "" + logger.info( + f"[reject-summary] tag={tag} window={self._summary_interval:.0f}s " + f"total={total} coins={len(by_coin)} top=[{top}] " + f"sample_bbo={sample_bbo}" + ) + any_emitted = True + if unknown > 0: + logger.warning( + f"[reject-summary] tag=unknown window={self._summary_interval:.0f}s " + f"count={unknown} (unknown patterns logged at ERROR individually)" + ) + any_emitted = True + return any_emitted + + # ------------------------------------------------------------------ # + # Operational helpers (read-only) + # ------------------------------------------------------------------ # + def get_stats_snapshot(self) -> Dict[str, Dict[str, int]]: + """Read-only counter snapshot for tests / external observers. + + Does not reset internal state (use :meth:`log_summary_if_due` + for flush semantics). + """ + with self._lock: + return { + tag: {coin: stats.count for coin, stats in by_coin.items()} + for tag, by_coin in self._stats.items() + } + + def get_unknown_count(self) -> int: + with self._lock: + return self._unknown_count diff --git a/strategies/market_making_strategy.py b/strategies/market_making_strategy.py index 4663d2e..3a69ad1 100644 --- a/strategies/market_making_strategy.py +++ b/strategies/market_making_strategy.py @@ -29,6 +29,10 @@ from strategies.mm_position_closer import PositionCloser from coin_utils import parse_coin from order_manager import BBO_OFFSET, OrderSide, round_price +from order_rejection_tracker import ( + ALLOWED_LOG_LEVELS, + OrderRejectionTracker, +) from rate_limiter import API_ERRORS logger = logging.getLogger(__name__) @@ -145,6 +149,36 @@ def __init__(self, market_data_manager, order_manager, config: Dict) -> None: f"cooldown={self.cfg.forager.cooldown_seconds}s" ) + # ---- Order rejection tracker (log downgrade + 5min summary) ---- # + # Routes routine post-only rejections through a classifier so the + # log level can be tuned per deployment and per-coin counts get + # aggregated into a single ``[reject-summary]`` line every + # ``rejection_summary_interval`` seconds. With the default + # log_level=error this preserves the legacy behaviour exactly, + # while the summary itself is purely additive. + rejection_log_level: str = str( + config.get('rejection_log_level', 'error') or 'error' + ).lower() + if rejection_log_level not in ALLOWED_LOG_LEVELS: + logger.warning( + f"[mm] Unknown rejection_log_level={rejection_log_level!r}, " + f"falling back to 'error'" + ) + rejection_log_level = 'error' + rejection_summary_interval: float = float( + config.get('rejection_summary_interval', 300.0) + ) + self._rejection_tracker: OrderRejectionTracker = OrderRejectionTracker( + routine_log_level=rejection_log_level, + summary_interval=rejection_summary_interval, + ) + self.order_manager.set_rejection_tracker(self._rejection_tracker) + if rejection_log_level != 'error' or rejection_summary_interval > 0: + logger.info( + f"[mm] Rejection tracker armed: log_level={rejection_log_level}, " + f"summary_interval={rejection_summary_interval}s" + ) + # ---- Per-coin offset/spread/size overrides (aliases of self.cfg.per_coin) ---- # self._coin_offset_overrides: Dict[str, float] = self.cfg.per_coin.offset self._coin_spread_overrides: Dict[str, float] = self.cfg.per_coin.spread @@ -338,6 +372,7 @@ def run(self, coins: List[str]) -> None: self._log_fill_rate() self._log_dynamic_age() self._closer.log_close_stats() + self._rejection_tracker.log_summary_if_due() # ---- Drain mode: pre-shutdown graceful close ---- # # Drain takes precedence over quiet hours: when an external diff --git a/tests/test_mm_adverse_selection_guards.py b/tests/test_mm_adverse_selection_guards.py index eaae438..2ecfe21 100644 --- a/tests/test_mm_adverse_selection_guards.py +++ b/tests/test_mm_adverse_selection_guards.py @@ -69,6 +69,9 @@ def _make_strategy(imbalance_threshold=0.0, loss_streak_limit=0, loss_streak_coo closer.get_close_oid.return_value = None s._closer = closer + s._rejection_tracker = MagicMock() + s._rejection_tracker.log_summary_if_due.return_value = False + return s, om, md diff --git a/tests/test_mm_cancel_on_fill.py b/tests/test_mm_cancel_on_fill.py index 11b8290..e6c1183 100644 --- a/tests/test_mm_cancel_on_fill.py +++ b/tests/test_mm_cancel_on_fill.py @@ -47,6 +47,8 @@ def _make_strategy(): strategy._tracker = tracker strategy._closer = MagicMock() strategy._closer.tracked_coins = set() + strategy._rejection_tracker = MagicMock() + strategy._rejection_tracker.log_summary_if_due.return_value = False strategy.update_positions = MagicMock() strategy.market_data = MagicMock() diff --git a/tests/test_mm_cycle_log.py b/tests/test_mm_cycle_log.py index d2baa36..8a11f0b 100644 --- a/tests/test_mm_cycle_log.py +++ b/tests/test_mm_cycle_log.py @@ -69,6 +69,9 @@ def _make_strategy(inventory_skew_bps=2, order_size_usd=100): closer.get_close_oid.return_value = None s._closer = closer + s._rejection_tracker = MagicMock() + s._rejection_tracker.log_summary_if_due.return_value = False + return s, om, md diff --git a/tests/test_order_manager_rejection_routing.py b/tests/test_order_manager_rejection_routing.py new file mode 100644 index 0000000..a8a07b3 --- /dev/null +++ b/tests/test_order_manager_rejection_routing.py @@ -0,0 +1,273 @@ +"""Integration tests: ``OrderManager`` ↔ ``OrderRejectionTracker``. + +Pin the contract that: + +1. With **no tracker** registered, the legacy ``logger.error`` line is + emitted exactly as before. This is the load-bearing back-compat + guarantee — any deployment that doesn't opt in must see byte-identical + behaviour. +2. With a tracker registered, every rejection is routed through it + (single order *and* bulk paths), no duplicate ERROR is emitted, and + the tracker counts the rejection per coin. + +The tests intentionally reach into ``OrderManager._place_order`` / +``_bulk_place_orders_with_builder`` via the public ``bulk_place_orders`` +shim and a single-order helper because both paths share the rejection +hook and we want regression coverage on both. +""" + +import logging +from unittest.mock import MagicMock, patch + +import pytest + +from order_manager import Order, OrderManager, OrderSide, OrderStatus +from order_rejection_tracker import OrderRejectionTracker + + +_POST_ONLY = ( + "Post only order would have immediately matched, " + "bbo was 98.73@98.758. asset=170005" +) + + +@pytest.fixture(autouse=True) +def _bypass_api_wrapper(): + with patch('order_manager.api_wrapper') as mock_wrapper: + mock_wrapper.call.side_effect = lambda fn, *a, **kw: fn(*a, **kw) + yield mock_wrapper + + +def _make_order_manager(): + exchange = MagicMock() + info = MagicMock() + return OrderManager(exchange, info, '0xabc') + + +def _make_order(coin='xyz:NVDA', side=OrderSide.BUY): + return Order( + id=None, coin=coin, side=side, size=0.1, price=100.0, + order_type={"limit": {"tif": "Alo"}}, reduce_only=False, + ) + + +# --------------------------------------------------------------------- # +# Single-order rejection path +# --------------------------------------------------------------------- # + + +class TestSingleRejectionWithoutTracker: + """Legacy behaviour: ERROR-level ``logger.error`` line is emitted.""" + + def test_single_rejection_logs_error(self, caplog): + mgr = _make_order_manager() + mgr.exchange.order.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [{'error': _POST_ONLY}]}}, + } + + with caplog.at_level(logging.DEBUG, logger='order_manager'): + result = mgr._place_order(_make_order()) + + assert result is None + # Legacy line: exactly one ERROR with "Order rejected:" prefix. + errs = [ + r for r in caplog.records + if r.levelno == logging.ERROR + and r.message.startswith("Order rejected: ") + ] + assert len(errs) == 1 + assert _POST_ONLY in errs[0].message + + +class TestSingleRejectionWithTracker: + """With tracker: routine match downgraded, counter incremented.""" + + def test_single_rejection_routed_through_tracker(self, caplog): + mgr = _make_order_manager() + tracker = OrderRejectionTracker( + routine_log_level='warning', summary_interval=0, + ) + mgr.set_rejection_tracker(tracker) + + mgr.exchange.order.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [{'error': _POST_ONLY}]}}, + } + + with caplog.at_level(logging.DEBUG): + result = mgr._place_order(_make_order(coin='xyz:NVDA')) + + assert result is None + # Tracker recorded the rejection + snap = tracker.get_stats_snapshot() + assert snap['post_only_match']['xyz:NVDA'] == 1 + + # No legacy ERROR line should have been emitted by order_manager. + legacy_errs = [ + r for r in caplog.records + if r.name == 'order_manager' + and r.levelno == logging.ERROR + and r.message.startswith("Order rejected:") + ] + assert legacy_errs == [], ( + "order_manager must defer to tracker, not double-log" + ) + + # Tracker emits its own line at the configured WARNING level. + tracker_lines = [ + r for r in caplog.records + if r.name == 'order_rejection_tracker' + and "[reject:post_only_match]" in r.message + ] + assert len(tracker_lines) == 1 + assert tracker_lines[0].levelno == logging.WARNING + + +class TestSingleRejectionWithTrackerAtDefaultError: + """Back-compat: tracker registered at default ERROR level emits the + legacy ``Order rejected: ...`` line (no categorised prefix), so log + scrapers / templated alerts that key off that exact prefix continue + to match. The categorised ``[reject:tag]`` format is reserved for + opt-in downgraded levels. + """ + + def test_default_error_level_emits_legacy_format(self, caplog): + mgr = _make_order_manager() + tracker = OrderRejectionTracker( + routine_log_level='error', summary_interval=0, + ) + mgr.set_rejection_tracker(tracker) + + mgr.exchange.order.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [{'error': _POST_ONLY}]}}, + } + + with caplog.at_level(logging.DEBUG): + mgr._place_order(_make_order(coin='xyz:NVDA')) + + # Counter still incremented (so summary aggregation works). + assert tracker.get_stats_snapshot()['post_only_match']['xyz:NVDA'] == 1 + + # Legacy line emitted at ERROR — matches what order_manager would + # produce without any tracker registered. + legacy = [ + r for r in caplog.records + if r.message == f"Order rejected: {_POST_ONLY}" + and r.levelno == logging.ERROR + ] + assert len(legacy) == 1 + + # Categorised format must NOT appear at the default level. + categorised = [ + r for r in caplog.records if "[reject:post_only_match]" in r.message + ] + assert categorised == [] + + +class TestUnknownPatternRouted: + """Unknown text is still surfaced at ERROR via the tracker path.""" + + def test_unknown_text_via_tracker_logs_error(self, caplog): + mgr = _make_order_manager() + tracker = OrderRejectionTracker( + routine_log_level='info', summary_interval=0, + ) + mgr.set_rejection_tracker(tracker) + + unknown = "Some genuinely new exchange rejection" + mgr.exchange.order.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [{'error': unknown}]}}, + } + + with caplog.at_level(logging.DEBUG): + mgr._place_order(_make_order(coin='xyz:NVDA')) + + # Tracker logs ERROR for unknown patterns regardless of configured + # routine level; the unknown counter is incremented. + assert tracker.get_unknown_count() == 1 + errs = [ + r for r in caplog.records + if r.name == 'order_rejection_tracker' + and r.levelno == logging.ERROR + ] + assert len(errs) == 1 + + +# --------------------------------------------------------------------- # +# Bulk-order rejection path +# --------------------------------------------------------------------- # + + +class TestBulkRejectionWithoutTracker: + def test_bulk_rejection_logs_error_with_index(self, caplog): + mgr = _make_order_manager() + mgr.exchange.bulk_orders.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [ + {'resting': {'oid': 100}}, + {'error': _POST_ONLY}, + ]}}, + } + + orders = [_make_order(coin='xyz:NVDA'), _make_order(coin='xyz:TSLA')] + with caplog.at_level(logging.DEBUG, logger='order_manager'): + results = mgr.bulk_place_orders(orders) + + assert results[0] is not None + assert results[1] is None + assert orders[1].status == OrderStatus.REJECTED + + errs = [ + r for r in caplog.records + if r.levelno == logging.ERROR + and "Bulk order [1] rejected" in r.message + ] + assert len(errs) == 1 + + +class TestBulkRejectionWithTracker: + def test_bulk_rejection_routed_per_coin(self, caplog): + mgr = _make_order_manager() + tracker = OrderRejectionTracker( + routine_log_level='warning', summary_interval=0, + ) + mgr.set_rejection_tracker(tracker) + + mgr.exchange.bulk_orders.return_value = { + 'status': 'ok', + 'response': {'data': {'statuses': [ + {'error': _POST_ONLY}, # NVDA rejected + {'resting': {'oid': 200}}, # TSLA placed + {'error': _POST_ONLY}, # SP500 rejected + ]}}, + } + + orders = [ + _make_order(coin='xyz:NVDA'), + _make_order(coin='xyz:TSLA'), + _make_order(coin='xyz:SP500'), + ] + with caplog.at_level(logging.DEBUG): + results = mgr.bulk_place_orders(orders) + + assert results[0] is None + assert results[1] is not None and results[1].id == 200 + assert results[2] is None + + snap = tracker.get_stats_snapshot() + # Each rejection counted under its own coin + assert snap['post_only_match']['xyz:NVDA'] == 1 + assert snap['post_only_match']['xyz:SP500'] == 1 + assert 'xyz:TSLA' not in snap['post_only_match'] + + # No legacy ``Bulk order [N] rejected`` ERROR from order_manager + legacy = [ + r for r in caplog.records + if r.name == 'order_manager' + and r.levelno == logging.ERROR + and 'Bulk order' in r.message + ] + assert legacy == [] diff --git a/tests/test_order_rejection_tracker.py b/tests/test_order_rejection_tracker.py new file mode 100644 index 0000000..9907931 --- /dev/null +++ b/tests/test_order_rejection_tracker.py @@ -0,0 +1,320 @@ +"""Tests for ``OrderRejectionTracker``. + +Pin three load-bearing behaviours: + +1. **Pattern classification** is correct and unknown text falls back to + ERROR-level logging. The exchange's reject text format is the only + thing the tracker can key off of, so a regression here is silent + and dangerous. +2. **Log level downgrade** is exact: routine matches log at the + configured level (default ``error`` preserves legacy behaviour); a + typo / unknown level falls back to ``error``. +3. **Periodic summary** flushes counters atomically and aggregates + per-coin counts with the correct top-N ordering and bbo sample. + +Concurrency is also covered with a small thread fan-in to verify +``record`` is safe under contention. +""" + +import logging +import threading +import time + +from order_rejection_tracker import ( + ALLOWED_LOG_LEVELS, + UNKNOWN_TAG, + OrderRejectionTracker, + classify_rejection, +) + + +_POST_ONLY_MSG = ( + "Post only order would have immediately matched, " + "bbo was 98.73@98.758. asset=170005" +) +_UNKNOWN_MSG = "Some new exchange-side rejection nobody has seen before" + + +# --------------------------------------------------------------------- # +# classify_rejection: pure pattern dispatch +# --------------------------------------------------------------------- # + + +class TestClassifyRejection: + def test_post_only_matches(self): + assert classify_rejection(_POST_ONLY_MSG) == "post_only_match" + + def test_post_only_substring_anywhere(self): + assert classify_rejection( + f"prefix... {_POST_ONLY_MSG} ...suffix" + ) == "post_only_match" + + def test_empty_string_unknown(self): + assert classify_rejection("") == UNKNOWN_TAG + + def test_unknown_returns_unknown_tag(self): + assert classify_rejection(_UNKNOWN_MSG) == UNKNOWN_TAG + + def test_post_only_text_only_substring(self): + """Variations on the canonical message still match.""" + variant = "Post only order would have immediately matched" + assert classify_rejection(variant) == "post_only_match" + + +# --------------------------------------------------------------------- # +# record: counters, log levels +# --------------------------------------------------------------------- # + + +class TestRecord: + def test_post_only_returns_tag_and_counts(self): + t = OrderRejectionTracker(routine_log_level="error", summary_interval=300) + tag = t.record("xyz:NVDA", _POST_ONLY_MSG) + assert tag == "post_only_match" + snap = t.get_stats_snapshot() + assert snap["post_only_match"]["xyz:NVDA"] == 1 + + def test_unknown_returns_unknown_tag_and_increments_separately(self): + t = OrderRejectionTracker(summary_interval=0) + tag = t.record("xyz:NVDA", _UNKNOWN_MSG) + assert tag == UNKNOWN_TAG + # Unknowns are NOT routed into the per-tag counters + assert "unknown" not in t.get_stats_snapshot() + assert t.get_unknown_count() == 1 + + def test_default_log_level_emits_legacy_line_at_error(self, caplog): + """At default ERROR level the line is byte-identical to the + legacy ``order_manager`` log so back-compat with log scrapers + holds. The richer ``[reject:tag]`` format is reserved for + opt-in downgraded levels.""" + t = OrderRejectionTracker() + with caplog.at_level(logging.DEBUG, logger="order_rejection_tracker"): + t.record("xyz:NVDA", _POST_ONLY_MSG) + legacy = [ + r for r in caplog.records + if r.message == f"Order rejected: {_POST_ONLY_MSG}" + ] + assert len(legacy) == 1 + assert legacy[0].levelno == logging.ERROR + # The categorised format must NOT appear at the default level. + categorised = [ + r for r in caplog.records if "[reject:post_only_match]" in r.message + ] + assert categorised == [] + + def test_log_level_downgrade_to_warning(self, caplog): + t = OrderRejectionTracker(routine_log_level="warning", summary_interval=0) + with caplog.at_level(logging.DEBUG, logger="order_rejection_tracker"): + t.record("xyz:NVDA", _POST_ONLY_MSG) + records = [ + r for r in caplog.records if "[reject:post_only_match]" in r.message + ] + assert len(records) == 1 + assert records[0].levelno == logging.WARNING + + def test_log_level_downgrade_to_info(self, caplog): + t = OrderRejectionTracker(routine_log_level="info", summary_interval=0) + with caplog.at_level(logging.DEBUG, logger="order_rejection_tracker"): + t.record("xyz:NVDA", _POST_ONLY_MSG) + records = [ + r for r in caplog.records if "[reject:post_only_match]" in r.message + ] + assert len(records) == 1 + assert records[0].levelno == logging.INFO + + def test_unknown_pattern_always_logs_at_error(self, caplog): + """Unknowns ignore the configured level — they must remain visible.""" + t = OrderRejectionTracker(routine_log_level="info", summary_interval=0) + with caplog.at_level(logging.DEBUG, logger="order_rejection_tracker"): + t.record("xyz:NVDA", _UNKNOWN_MSG) + err = [ + r for r in caplog.records + if r.levelno == logging.ERROR and "Order rejected" in r.message + ] + assert len(err) == 1 + + def test_unknown_log_level_string_falls_back_to_error(self, caplog): + """A typo'd config value must not crash and must not silently skip + logging — fall back to ERROR (legacy line format) so the + operator still sees output.""" + t = OrderRejectionTracker(routine_log_level="WARN_typo", summary_interval=0) + with caplog.at_level(logging.DEBUG, logger="order_rejection_tracker"): + t.record("xyz:NVDA", _POST_ONLY_MSG) + # Falls back to ERROR → emits the legacy "Order rejected: ..." line. + legacy = [ + r for r in caplog.records + if r.message == f"Order rejected: {_POST_ONLY_MSG}" + and r.levelno == logging.ERROR + ] + assert len(legacy) == 1 + + def test_uppercase_log_level_accepted(self): + """Config values like ``WARNING`` should still resolve.""" + t = OrderRejectionTracker(routine_log_level="WARNING", summary_interval=0) + # _level is private API but the test pins behaviour by exercising + # log emission rather than asserting on the attribute directly. + # Round-trip via record + caplog instead. + import logging as _logging + records = [] + handler = _logging.Handler() + handler.emit = records.append + logger = _logging.getLogger("order_rejection_tracker") + logger.addHandler(handler) + logger.setLevel(_logging.DEBUG) + try: + t.record("xyz:NVDA", _POST_ONLY_MSG) + finally: + logger.removeHandler(handler) + assert any(r.levelno == logging.WARNING for r in records) + + def test_first_ts_pinned_on_first_record(self): + t = OrderRejectionTracker(summary_interval=0) + t.record("xyz:NVDA", _POST_ONLY_MSG) + time.sleep(0.001) + t.record("xyz:NVDA", _POST_ONLY_MSG) + # Direct read of internal stats to verify first/last timestamps + # are tracked separately (used by summary line). + coin_stats = t._stats["post_only_match"]["xyz:NVDA"] + assert coin_stats.count == 2 + assert coin_stats.first_ts > 0 + assert coin_stats.last_ts >= coin_stats.first_ts + + +# --------------------------------------------------------------------- # +# log_summary_if_due +# --------------------------------------------------------------------- # + + +class TestSummary: + def test_not_due_returns_false(self, caplog): + t = OrderRejectionTracker(summary_interval=300) + t.record("xyz:NVDA", _POST_ONLY_MSG) + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + assert t.log_summary_if_due() is False + # Counter NOT reset when not due + assert t.get_stats_snapshot()["post_only_match"]["xyz:NVDA"] == 1 + + def test_due_emits_summary_and_resets(self, caplog): + t = OrderRejectionTracker(summary_interval=300) + t.record("xyz:NVDA", _POST_ONLY_MSG) + t.record("xyz:NVDA", _POST_ONLY_MSG) + t.record("flx:SILVER", _POST_ONLY_MSG) + + # Force interval to elapse by passing an explicit monotonic value. + future = t._last_summary_ts + 301.0 + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + assert t.log_summary_if_due(now_monotonic=future) is True + + # Summary line is INFO level + summaries = [ + r for r in caplog.records if "[reject-summary]" in r.message + ] + assert len(summaries) == 1 + msg = summaries[0].message + assert "tag=post_only_match" in msg + assert "total=3" in msg + assert "coins=2" in msg + # Top is sorted by count descending + assert "xyz:NVDA=2" in msg + assert "flx:SILVER=1" in msg + + # Counters reset + assert t.get_stats_snapshot() == {} + + def test_summary_interval_zero_disables(self, caplog): + t = OrderRejectionTracker(summary_interval=0) + t.record("xyz:NVDA", _POST_ONLY_MSG) + future = time.monotonic() + 1_000_000.0 + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + assert t.log_summary_if_due(now_monotonic=future) is False + # Counter not reset because no flush happened + assert t.get_stats_snapshot()["post_only_match"]["xyz:NVDA"] == 1 + + def test_summary_includes_unknown_when_present(self, caplog): + t = OrderRejectionTracker(summary_interval=300) + t.record("xyz:NVDA", _UNKNOWN_MSG) + future = t._last_summary_ts + 301.0 + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + t.log_summary_if_due(now_monotonic=future) + unknown_lines = [ + r for r in caplog.records + if "[reject-summary] tag=unknown" in r.message + ] + assert len(unknown_lines) == 1 + assert unknown_lines[0].levelno == logging.WARNING + assert "count=1" in unknown_lines[0].message + + def test_summary_skipped_when_no_events(self, caplog): + """An empty interval emits no summary line and the helper + returns False — the contract is "True iff a line was logged".""" + t = OrderRejectionTracker(summary_interval=300) + future = t._last_summary_ts + 301.0 + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + emitted = t.log_summary_if_due(now_monotonic=future) + # No counters, so nothing was logged → False per the contract. + assert emitted is False + summaries = [ + r for r in caplog.records if "[reject-summary]" in r.message + ] + assert summaries == [] + # The internal cursor *was* still advanced so the next due check + # waits another full interval rather than re-firing every loop. + assert t._last_summary_ts == future + + def test_top_8_truncation(self, caplog): + t = OrderRejectionTracker(summary_interval=300) + # 12 distinct coins, one rejection each + for i in range(12): + t.record(f"coin{i}", _POST_ONLY_MSG) + future = t._last_summary_ts + 301.0 + with caplog.at_level(logging.INFO, logger="order_rejection_tracker"): + t.log_summary_if_due(now_monotonic=future) + msg = next( + r.message for r in caplog.records if "[reject-summary]" in r.message + ) + # 12 unique coins recorded but top=[...] is truncated to 8 entries. + assert "coins=12" in msg + top_segment = msg.split("top=[")[1].split("]")[0] + # Each ``coin=count`` entry contains exactly one ``=``. + assert top_segment.count("=") == 8 + + +# --------------------------------------------------------------------- # +# Concurrency +# --------------------------------------------------------------------- # + + +class TestConcurrency: + def test_concurrent_record_no_lost_increments(self): + t = OrderRejectionTracker(summary_interval=0) + N_THREADS = 10 + N_PER = 100 + + def worker(): + for _ in range(N_PER): + t.record("xyz:NVDA", _POST_ONLY_MSG) + + threads = [threading.Thread(target=worker) for _ in range(N_THREADS)] + for th in threads: + th.start() + for th in threads: + th.join() + + snap = t.get_stats_snapshot() + assert snap["post_only_match"]["xyz:NVDA"] == N_THREADS * N_PER + + +# --------------------------------------------------------------------- # +# Module-level constants +# --------------------------------------------------------------------- # + + +class TestModuleSurface: + def test_allowed_log_levels_include_error_warning_info(self): + for level in ("error", "warning", "info", "debug"): + assert level in ALLOWED_LOG_LEVELS + + def test_unknown_tag_value(self): + # Pin the wire-format tag name; downstream tooling (operation + # logs / dashboards) keys off this string. + assert UNKNOWN_TAG == "unknown" diff --git a/tests/test_strategy_validator.py b/tests/test_strategy_validator.py index 7cb3105..e7a7c5e 100644 --- a/tests/test_strategy_validator.py +++ b/tests/test_strategy_validator.py @@ -211,6 +211,50 @@ def test_taker_fallback_none_accepted(self): 'taker_fallback_age_seconds': None, }) is None + def test_rejection_log_level_valid_strings(self): + for level in ('error', 'warning', 'info', 'debug'): + assert validate_strategy_config('market_making', { + 'rejection_log_level': level, + }) is None + + def test_rejection_log_level_uppercase_accepted(self): + # Validator normalises case before checking against the allow-list. + assert validate_strategy_config('market_making', { + 'rejection_log_level': 'WARNING', + }) is None + + def test_rejection_log_level_invalid_value_rejected(self): + err = validate_strategy_config('market_making', { + 'rejection_log_level': 'critical', + }) + assert err is not None + assert 'rejection_log_level' in err + + def test_rejection_log_level_non_string_rejected(self): + err = validate_strategy_config('market_making', { + 'rejection_log_level': 42, + }) + assert err is not None + assert 'rejection_log_level' in err + + def test_rejection_summary_interval_zero_accepted(self): + # 0 disables the summary; must not be rejected. + assert validate_strategy_config('market_making', { + 'rejection_summary_interval': 0, + }) is None + + def test_rejection_summary_interval_positive_accepted(self): + assert validate_strategy_config('market_making', { + 'rejection_summary_interval': 300, + }) is None + + def test_rejection_summary_interval_negative_rejected(self): + err = validate_strategy_config('market_making', { + 'rejection_summary_interval': -1, + }) + assert err is not None + assert 'rejection_summary_interval' in err + class TestMultipleErrors: """Validate that multiple errors are reported at once.""" diff --git a/validation/strategy_validator.py b/validation/strategy_validator.py index e3c1d3f..74ad74c 100644 --- a/validation/strategy_validator.py +++ b/validation/strategy_validator.py @@ -293,6 +293,28 @@ def _validate_market_making(config: Dict) -> List[str]: errors.append( f"forager_activity_idle_min_seconds: must be >= 0, got {val}" ) + + # Order rejection tracker. + if 'rejection_log_level' in config: + val = config['rejection_log_level'] + if not isinstance(val, str): + errors.append( + f"rejection_log_level: expected string, got {type(val).__name__}" + ) + elif val.lower() not in ('error', 'warning', 'info', 'debug'): + errors.append( + f"rejection_log_level: must be one of error/warning/info/debug, got {val!r}" + ) + if 'rejection_summary_interval' in config: + val = config['rejection_summary_interval'] + if not isinstance(val, (int, float)): + errors.append( + f"rejection_summary_interval: expected number, got {type(val).__name__}" + ) + elif val < 0: + errors.append( + f"rejection_summary_interval: must be >= 0, got {val}" + ) return errors