Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ The `market_making` strategy uses **progressive close pricing**: as a position a

**Adverse selection logging** (`--enable-adverse-selection-log`): Measures mid-price movement 5s/30s/60s after each fill, logging per-coin summaries every 300s. Observation only — no trading impact.

**Order rejection log aggregation** (`--rejection-log-level`, `--rejection-summary-interval`): Routine post-only rejections (`Post only order would have immediately matched`) are an expected retry signal under maker-only quoting, but they were historically logged at ERROR — drowning out genuine errors as MM size grows. The strategy now classifies each rejection by API error text and adds a single `[reject-summary]` INFO line every `--rejection-summary-interval` seconds (default 300, set to 0 to disable). The default per-rejection log level stays `error` and emits the same byte-identical `Order rejected: …` line as before, so existing log scrapers and ERROR-rate alerts keep working. Flipping `--rejection-log-level warning` (or `info`) opts into a richer `[reject:tag] coin — …` categorised format at the chosen level; unknown rejection text always falls through to ERROR with the legacy line so format changes / new reject reasons stay visible.

**Dynamic offset** (`--dynamic-offset`): Auto-adjusts per-coin BBO offset based on adverse selection severity from the tracker. Coins with higher adverse selection get wider offsets; favorable coins get tighter offsets. Requires `--enable-ws` and `--enable-adverse-selection-log`. Manual `--coin-offset-overrides` serve as the baseline; dynamic adjustment adds/subtracts from it.

**Dynamic position age** (`--dynamic-age`): Adjusts `MAX_POSITION_AGE` per coin based on recent volatility. High-volatility coins get shorter holding times (reducing adverse selection risk), while low-volatility coins get longer times (improving maker fill probability). Uses the same mid-price history as `--vol-adjust`. Configure `--dynamic-age-baseline-vol` (bps) as the "normal" volatility reference, and `--dynamic-age-min` / `--dynamic-age-max` (seconds) for clamping bounds. Falls back to the fixed `--max-position-age` when data is insufficient.
Expand Down Expand Up @@ -686,6 +688,8 @@ ws_guards: # All require --enable-ws
velocity_min_move_bps: 1.0 # --velocity-min-move-bps (min cumulative move in bps to trigger)
enable_adverse_selection_log: false # --enable-adverse-selection-log (post-fill mid tracking)
adverse_selection_log_interval: 300 # --adverse-selection-log-interval (summary log interval in seconds)
rejection_log_level: "error" # --rejection-log-level (level for routine post-only rejections: error/warning/info/debug; default error preserves legacy behaviour)
rejection_summary_interval: 300 # --rejection-summary-interval (aggregate rejection summary cadence in seconds; 0 disables)

config_merge_order: "default_configs[strategy] ← CLI overrides (only non-null)"
priority: "CLI flag > env var > default_configs > strategy constructor fallback"
Expand Down
11 changes: 11 additions & 0 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@
'forager_weight_activity',
'forager_weight_quality',
'forager_weight_cost',
'rejection_log_level',
'rejection_summary_interval',
'drain_flag_file',
],
}
Expand Down Expand Up @@ -1253,6 +1255,15 @@ def stop(self):
help='Enable post-fill adverse selection measurement logging')
parser.add_argument('--adverse-selection-log-interval', type=float,
help='Adverse selection summary log interval in seconds (default: 300)')
parser.add_argument('--rejection-log-level', type=str,
choices=['error', 'warning', 'info', 'debug'],
help='Log level for routine post-only order rejections '
'(default: error — preserves legacy behaviour). '
'Set to "warning" to reduce ERROR noise once the '
'5min summary line is trusted.')
parser.add_argument('--rejection-summary-interval', type=float,
help='Order rejection aggregate summary interval in seconds; '
'0 disables the summary (default: 300)')
parser.add_argument('--coin-offset-overrides', type=str, default='',
help='Per-coin BBO offset overrides in bps (e.g. "SP500:0.5,MSFT:3")')
parser.add_argument('--coin-spread-overrides', type=str, default='',
Expand Down
40 changes: 33 additions & 7 deletions order_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from collections import defaultdict
from typing import Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
Expand All @@ -12,6 +12,9 @@
from coin_utils import is_hip3, parse_coin
from ttl_cache import TTLCacheEntry, TTLCacheMap

if TYPE_CHECKING:
from order_rejection_tracker import OrderRejectionTracker

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -90,6 +93,30 @@ def __init__(self, exchange: Exchange, info: Info, account_address: str,
# Not thread-safe; bot runs single-threaded.
self._open_orders_cache: TTLCacheEntry[List[Dict]] = TTLCacheEntry(user_state_cache_ttl)

# Optional rejection tracker: when set, routine post-only rejections
# are routed through it for log-level downgrade and aggregation.
# When ``None`` the legacy ERROR-level path is used (unchanged).
self._rejection_tracker: Optional["OrderRejectionTracker"] = None

def set_rejection_tracker(self, tracker: "OrderRejectionTracker") -> None:
"""Wire a tracker so future rejections are classified and aggregated.

When unset, the legacy ERROR-level path is preserved exactly.
"""
self._rejection_tracker = tracker

def _record_rejection(self, coin: str, error_msg: str, prefix: str = "Order rejected") -> None:
"""Route a rejection through the tracker if registered, else log ERROR.

The tracker handles its own logging at the configured level for
routine matches; for unknown patterns it forwards to ERROR. With
no tracker we preserve the historical ``logger.error`` line.
"""
if self._rejection_tracker is not None:
self._rejection_tracker.record(coin, error_msg)
else:
logger.error(f"{prefix}: {error_msg}")

def _invalidate_open_orders_cache(self) -> None:
"""Clear cached open orders after a write operation (place/cancel)."""
self._open_orders_cache.invalidate()
Expand Down Expand Up @@ -392,9 +419,7 @@ def _place_order(self, order: Order) -> Optional[Order]:
return order

if 'error' in status_info:
logger.error(
"Order rejected: %s", status_info['error']
)
self._record_rejection(order.coin, status_info['error'])
order.status = OrderStatus.REJECTED
return None

Expand Down Expand Up @@ -495,9 +520,10 @@ def _bulk_place_orders_with_builder(
break

if 'error' in status_info:
logger.error(
"Bulk order [%d] rejected: %s",
i, status_info['error'],
self._record_rejection(
orders[i].coin,
status_info['error'],
prefix=f"Bulk order [{i}] rejected",
)
orders[i].status = OrderStatus.REJECTED
continue
Expand Down
258 changes: 258 additions & 0 deletions order_rejection_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
"""Order rejection classification and aggregation.

The bot's order placement path (``order_manager.py``) historically emits
an ERROR-level log for *every* rejection returned by the Hyperliquid
exchange, regardless of whether the rejection signals a routine retry
condition (e.g. post-only orders that would have crossed the BBO) or a
genuine error condition. Under live MM load the routine post-only
rejection accounts for nearly all ERROR records, drowning out real
anomalies in monitoring.

This module reclassifies each rejection by matching the API error text
against a static set of known "routine" patterns. Matched rejections can
be downgraded to WARNING / INFO via config, and the tracker accumulates
per-coin counts that are flushed as a single summary line every
``rejection_summary_interval`` seconds.

The tracker is *opt-in*: ``order_manager.py`` only invokes it when one
has been registered via ``set_rejection_tracker``. With no tracker the
legacy ERROR-level path is preserved exactly.

Public surface:

* :class:`OrderRejectionTracker` — main tracker.
* :func:`classify_rejection` — pure helper used by tests.

Both are intentionally cheap on the order-placement hot path: ``record``
is O(1) under a short-lived lock, and the periodic summary is driven
externally by the strategy's main loop (no background thread).
"""

import logging
import re
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Optional, Tuple

logger = logging.getLogger(__name__)


# Pattern matcher → tag. Patterns are evaluated in order; current entries
# are mutually exclusive on the API's error text format. Adding a new
# routine pattern (e.g. ``Reduce only``) is a one-line edit here plus a
# regression test in ``tests/test_order_rejection_tracker.py``.
_ROUTINE_PATTERNS: Tuple[Tuple["re.Pattern[str]", str], ...] = (
(re.compile(r"Post only order would have immediately matched"),
"post_only_match"),
)

# Mapping from log-level config string to logging level constant. Keys
# are normalised to lowercase before lookup so config values like
# ``"WARNING"`` continue to work.
_LEVEL_MAP: Dict[str, int] = {
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.DEBUG,
}

# Allowed config string values, exposed for validators.
ALLOWED_LOG_LEVELS = tuple(_LEVEL_MAP.keys())

# Tag used for unmatched rejection text. Always logged at ERROR so a
# format change on the exchange surfaces immediately.
UNKNOWN_TAG = "unknown"


def classify_rejection(msg: str) -> str:
"""Return the routine tag for *msg*, or :data:`UNKNOWN_TAG` if none match.

Pure function exposed for tests so the pattern map can be exercised
without instantiating a tracker.
"""
for pat, tag in _ROUTINE_PATTERNS:
if pat.search(msg):
return tag
return UNKNOWN_TAG


def _extract_bbo(msg: str) -> str:
"""Best-effort extraction of the ``"bid@ask"`` snippet from a reject text.

Returns an empty string when the pattern is absent (e.g. older HL
error messages or future format changes).
"""
m = re.search(r"bbo was ([\d.]+@[\d.]+)", msg)
return m.group(1) if m else ""


@dataclass
class _CoinStats:
count: int = 0
first_ts: float = 0.0
last_ts: float = 0.0
last_bbo: str = ""


class OrderRejectionTracker:
"""Classify and aggregate order-rejection events.

Public methods are thread-safe. ``record`` is on the order-placement
hot path and intentionally O(1).
"""

def __init__(
self,
routine_log_level: str = "error",
summary_interval: float = 300.0,
) -> None:
self._level: int = _LEVEL_MAP.get(
(routine_log_level or "error").lower(), logging.ERROR
)
self._summary_interval: float = float(summary_interval)

# tag -> coin -> _CoinStats. Reset after each summary flush.
self._stats: Dict[str, Dict[str, _CoinStats]] = defaultdict(
lambda: defaultdict(_CoinStats)
)
self._unknown_count: int = 0
self._lock = threading.Lock()
self._last_summary_ts: float = time.monotonic()

# ------------------------------------------------------------------ #
# Recording
# ------------------------------------------------------------------ #
def record(self, coin: str, raw_msg: str) -> str:
"""Record a rejection. Returns the matched tag, or :data:`UNKNOWN_TAG`.

Emits the per-rejection log at the configured level for routine
matches; unknown patterns are forwarded to ERROR so format
changes / new reject reasons remain visible.

At the default ERROR level the line is byte-identical to the
legacy ``logger.error("Order rejected: …")`` produced by
``order_manager.py`` so existing log scrapers / ERROR-rate alert
templates continue to match. The richer
``[reject:tag] coin — …`` format is reserved for the opt-in
downgraded levels (warning / info / debug).
"""
tag = classify_rejection(raw_msg)
bbo = _extract_bbo(raw_msg)
now_wall = time.time()

if tag == UNKNOWN_TAG:
with self._lock:
self._unknown_count += 1
# Legacy ``Order rejected: <msg>`` format preserved so
# back-compat with log scrapers holds for unknowns too.
logger.error(f"Order rejected: {raw_msg}")
return tag

with self._lock:
stats = self._stats[tag][coin]
if stats.count == 0:
stats.first_ts = now_wall
stats.count += 1
stats.last_ts = now_wall
stats.last_bbo = bbo

# Logger call deliberately outside the lock: the lock guards
# only the in-memory counters.
if self._level == logging.ERROR:
# Default deployment: emit the legacy line verbatim so back-
# compat with existing log scrapers / templated alerts holds.
logger.error(f"Order rejected: {raw_msg}")
else:
# Downgraded — operator has opted in, give them the richer
# categorised line that carries coin + tag for grep-ability.
logger.log(self._level, f"[reject:{tag}] {coin} — {raw_msg}")
return tag

# ------------------------------------------------------------------ #
# Periodic summary
# ------------------------------------------------------------------ #
def log_summary_if_due(self, now_monotonic: Optional[float] = None) -> bool:
"""Emit a summary line if the configured interval has elapsed.

Returns ``True`` iff a summary line was actually emitted, ``False``
when:

* ``summary_interval <= 0`` (feature disabled), or
* the interval has not elapsed yet, or
* the interval elapsed but no rejections occurred (counters were
flushed silently — the strategy main loop discards the return
value, but tests / external observers can rely on this signal).
"""
if self._summary_interval <= 0:
return False
now = now_monotonic if now_monotonic is not None else time.monotonic()
if now - self._last_summary_ts < self._summary_interval:
return False

with self._lock:
snapshot = self._stats
unknown = self._unknown_count
self._stats = defaultdict(lambda: defaultdict(_CoinStats))
self._unknown_count = 0
self._last_summary_ts = now

return self._emit_summary(snapshot, unknown)

# ------------------------------------------------------------------ #
# Internal helpers
# ------------------------------------------------------------------ #
def _emit_summary(
self,
snapshot: Dict[str, Dict[str, _CoinStats]],
unknown: int,
) -> bool:
"""Emit summary lines and return ``True`` iff anything was logged.

Returns ``False`` when both *snapshot* and *unknown* are empty —
the helper stays silent during idle periods rather than emitting
a meaningless ``total=0`` line every interval.
"""
any_emitted = False
for tag, by_coin in snapshot.items():
if not by_coin:
continue
total = sum(s.count for s in by_coin.values())
ranked = sorted(
by_coin.items(), key=lambda kv: kv[1].count, reverse=True
)
top = ", ".join(f"{coin}={s.count}" for coin, s in ranked[:8])
sample_bbo = ranked[0][1].last_bbo if ranked else ""
logger.info(
f"[reject-summary] tag={tag} window={self._summary_interval:.0f}s "
f"total={total} coins={len(by_coin)} top=[{top}] "
f"sample_bbo={sample_bbo}"
)
any_emitted = True
if unknown > 0:
logger.warning(
f"[reject-summary] tag=unknown window={self._summary_interval:.0f}s "
f"count={unknown} (unknown patterns logged at ERROR individually)"
)
any_emitted = True
return any_emitted

# ------------------------------------------------------------------ #
# Operational helpers (read-only)
# ------------------------------------------------------------------ #
def get_stats_snapshot(self) -> Dict[str, Dict[str, int]]:
"""Read-only counter snapshot for tests / external observers.

Does not reset internal state (use :meth:`log_summary_if_due`
for flush semantics).
"""
with self._lock:
return {
tag: {coin: stats.count for coin, stats in by_coin.items()}
for tag, by_coin in self._stats.items()
}

def get_unknown_count(self) -> int:
with self._lock:
return self._unknown_count
Loading
Loading