Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions allways/validator/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,22 +790,17 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
if miner and isinstance(remaining, int):
self._record_collateral_event(block_num, miner, int(remaining))
elif name == 'ReservationExtensionFinalized':
# Event-driven cache update for the local pending_confirms row —
# replaces the polling refresh that the legacy vote-extend flow
# needed (commit 1b942e8). Without this write the upstream
# purge_expired sweep would delete a still-live entry at its
# stale reserved_until.
# Event-driven cache update: bumps both the pending_confirms row and
# the reservation pin so neither purge sweep drops a still-live entry
# at its stale reserved_until. Replaces the legacy polling refresh.
miner = values.get('miner', '')
applied_target = values.get('applied_target')
if miner and isinstance(applied_target, int):
self.state_store.update_reserved_until(miner, applied_target)
self.state_store.extend_reservation_deadline(miner, applied_target)
bt.logging.info(
f'EventWatcher: {self._label(miner)} ReservationExtensionFinalized '
f'applied_target={applied_target} @ block {block_num}'
)
# Keep the pin's TTL in step so purge_expired_reservation_pins
# doesn't drop a still-live pin at its stale deadline.
self.state_store.update_reservation_pin_reserved_until(miner, applied_target)
elif name == 'TimeoutExtensionFinalized':
swap_id = values.get('swap_id')
applied_target = values.get('applied_target')
Expand Down
8 changes: 4 additions & 4 deletions allways/validator/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,10 @@ def try_extend_reservation(
pending=pending,
)
if finalized_target is not None:
# Same-step write so the upstream purge sweep sees the bumped deadline.
# The matching ReservationExtensionFinalized event won't be picked up
# until the next forward step's event_watcher.sync_to.
self.state_store.update_reserved_until(item.miner_hotkey, finalized_target)
# Same-step write so the upstream purge sweeps see the bumped deadline,
# bumping BOTH the pending_confirms row and the pin — updating only the
# row would let the pin purge drop a still-live pin at its stale TTL (#441).
self.state_store.extend_reservation_deadline(item.miner_hotkey, finalized_target)
reserved_until = finalized_target
# The just-finalized proposal is gone from contract storage; refresh
# so downstream challenge/propose see the post-finalize state instead
Expand Down
4 changes: 2 additions & 2 deletions allways/validator/optimistic_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def maybe_finalize_reservation(

Returns the applied ``target_block`` on success, ``None`` otherwise.
Callers use the returned target to refresh local caches (e.g.
state_store.update_reserved_until) without waiting for the next event
sync. ``pending`` is the caller-supplied snapshot — see
state_store.extend_reservation_deadline) without waiting for the next
event sync. ``pending`` is the caller-supplied snapshot — see
``maybe_propose_reservation``. The contract's own check is
authoritative; the local pending read just lets us avoid a
known-doomed tx. ``challenge_window_blocks`` is passed in (rather
Expand Down
28 changes: 10 additions & 18 deletions allways/validator/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,6 @@ def remove(self, miner_hotkey: str) -> Optional[PendingConfirm]:
)
return self.row_to_pending(row) if row is not None else None

def update_reserved_until(self, miner_hotkey: str, reserved_until: int) -> None:
"""Refresh the cached reserved_until on an existing pending_confirms row.

Called after the contract's reservation has been extended on-chain — without
this, the row's stale value causes ``purge_expired_pending_confirms`` to
delete a still-live entry the moment the original TTL elapses.
"""
self._execute(
'UPDATE pending_confirms SET reserved_until = ? WHERE miner_hotkey = ?',
(reserved_until, miner_hotkey),
)

def has(self, miner_hotkey: str) -> bool:
row = self._fetchone(
'SELECT 1 FROM pending_confirms WHERE miner_hotkey = ? LIMIT 1',
Expand Down Expand Up @@ -255,13 +243,17 @@ def remove_reservation_pin(self, miner_hotkey: str) -> Optional[ReservationPin]:
)
return self.row_to_reservation_pin(row) if row is not None else None

def update_reservation_pin_reserved_until(self, miner_hotkey: str, reserved_until: int) -> None:
"""Refresh the cached reserved_until on an existing pin row.

Mirrors ``update_reserved_until`` — called after the contract extends
the reservation, so ``purge_expired_reservation_pins`` doesn't drop a
still-live pin at its stale TTL.
def extend_reservation_deadline(self, miner_hotkey: str, reserved_until: int) -> None:
"""Advance a reservation's deadline on BOTH cached copies (pending_confirms
row and reservation pin) so neither purge sweep drops a still-live
reservation. The one write path all extension-finalize callers must use:
bumping only one table is what desynced the pin and prematurely purged
it (#441). Each UPDATE is a no-op when that table has no matching row.
"""
self._execute(
'UPDATE pending_confirms SET reserved_until = ? WHERE miner_hotkey = ?',
(reserved_until, miner_hotkey),
)
self._execute(
'UPDATE reservation_pins SET reserved_until = ? WHERE miner_hotkey = ?',
(reserved_until, miner_hotkey),
Expand Down
83 changes: 82 additions & 1 deletion tests/test_forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
reproduces the swap 206 timeout.
"""

from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock

from allways.classes import Swap, SwapStatus
from allways.validator.forward import extend_fulfilled_near_timeout
from allways.validator.forward import extend_fulfilled_near_timeout, try_extend_reservation
from allways.validator.state_store import PendingConfirm, ReservationPin, ValidatorStateStore
from allways.validator.swap_tracker import SwapTracker

# Real swap 206 numbers: extension 2 was proposed at this block, and a BTC
Expand Down Expand Up @@ -136,3 +138,82 @@ def test_skips_extension_when_dest_tx_fails_canonical_check(self):

v.optimistic_extensions.maybe_propose_timeout.assert_not_called()
v.optimistic_extensions.maybe_challenge_timeout.assert_not_called()


def make_reservation_validator(store, current_block, finalized_target):
"""Stand-in Validator for ``try_extend_reservation`` carrying a real state
store. ``maybe_finalize_reservation`` returns ``finalized_target`` (an int
for an inline finalize this step)."""
ext = MagicMock()
ext.fetch_pending_reservation.return_value = MagicMock()
ext.maybe_finalize_reservation.return_value = finalized_target

subtensor = MagicMock()
subtensor.get_current_block.return_value = current_block

contract_client = MagicMock()
contract_client.get_miner_reserved_until.return_value = 1000

return SimpleNamespace(
subtensor=subtensor,
contract_client=contract_client,
optimistic_extensions=ext,
state_store=store,
)


class TestTryExtendReservationPinSync:
"""Regression for #441: an inline reservation-extension finalize must bump
BOTH the pending_confirms row and the reservation pin, so the same forward
step's pin purge can't drop a still-live pin at its stale TTL."""

def _seed(self, store):
store.upsert_reservation_pin(
ReservationPin(
miner_hotkey='miner-1',
reserve_block=900,
from_chain='btc',
to_chain='tao',
rate_str='345',
counter_rate_str='0.0029',
miner_from_address='bc1-miner',
miner_to_address='5miner',
reserved_until=1000,
created_at=1.0,
)
)
item = PendingConfirm(
miner_hotkey='miner-1',
from_tx_hash='tx-1',
from_chain='btc',
to_chain='tao',
from_address='bc1-user',
to_address='5user',
tao_amount=123,
from_amount=456,
to_amount=789,
miner_from_address='bc1-miner',
miner_to_address='5miner',
rate_str='345',
reserved_until=1000,
queued_at=1.0,
)
store.enqueue(item)
return item

def test_inline_finalize_bumps_pin_so_same_step_purge_keeps_it(self, tmp_path: Path):
# Current block is past the original reserved_until (1000), so the pin
# purge would drop the pin unless the inline finalize bumped its TTL.
store = ValidatorStateStore(db_path=tmp_path / 'state.db', current_block_fn=lambda: 1003)
item = self._seed(store)
v = make_reservation_validator(store, current_block=1003, finalized_target=1300)

# tx_info=None returns right after the finalize block, so we exercise the
# finalize path without the propose/challenge machinery.
try_extend_reservation(v, item, current_block=1003, swap_label='X', miner_short='Y', tx_info=None)

assert store.get_reservation_pin('miner-1').reserved_until == 1300
assert store.get_all()[0].reserved_until == 1300
assert store.purge_expired_reservation_pins() == 0
assert store.get_reservation_pin('miner-1') is not None
store.close()
4 changes: 2 additions & 2 deletions tests/test_optimistic_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ def test_finalizes_when_window_elapsed(self):
pending=w.fetch_pending_reservation(MINER),
)
# Returns the applied target so the caller can refresh local caches
# (e.g. state_store.update_reserved_until) without waiting for the
# next event sync.
# (e.g. state_store.extend_reservation_deadline) without waiting for
# the next event sync.
assert result == 1180
w.contract_client.finalize_extend_reservation.assert_called_once()

Expand Down
10 changes: 5 additions & 5 deletions tests/test_pending_confirm_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ def test_purge_expired_pending_confirms_removes_stale_entries(self, tmp_path: Pa
assert queue.has('miner-2')
assert queue.pending_size() == 1

def test_update_reserved_until_prevents_stale_purge(self, tmp_path: Path):
"""Regression: after the contract extends a reservation, refreshing the
def test_extend_reservation_deadline_prevents_stale_purge(self, tmp_path: Path):
"""Regression: after the contract extends a reservation, bumping the
cached reserved_until must keep the row alive past its original TTL."""
db_path = tmp_path / 'state.db'
current_block = 105
queue = ValidatorStateStore(db_path=db_path, current_block_fn=lambda: current_block)

queue.enqueue(PENDING_CONFIRM_SAMPLE1) # reserved_until=100
queue.update_reserved_until('miner-1', 130)
queue.extend_reservation_deadline('miner-1', 130)

items = queue.get_all()
assert len(items) == 1
Expand All @@ -125,10 +125,10 @@ def test_update_reserved_until_prevents_stale_purge(self, tmp_path: Path):
assert removed == 0
assert queue.has('miner-1')

def test_update_reserved_until_unknown_hotkey_is_noop(self, tmp_path: Path):
def test_extend_reservation_deadline_unknown_hotkey_is_noop(self, tmp_path: Path):
db_path = tmp_path / 'state.db'
queue = ValidatorStateStore(db_path=db_path)
queue.update_reserved_until('miner-unknown', 999)
queue.extend_reservation_deadline('miner-unknown', 999)
assert queue.pending_size() == 0

def test_enqueue_and_remove_are_safe_across_threads(self, tmp_path: Path):
Expand Down
52 changes: 44 additions & 8 deletions tests/test_state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dataclasses import replace
from pathlib import Path

from allways.validator.state_store import ReservationPin, ValidatorStateStore
from allways.validator.state_store import PendingConfirm, ReservationPin, ValidatorStateStore

PIN_SAMPLE1 = ReservationPin(
miner_hotkey='miner-1',
Expand Down Expand Up @@ -133,16 +133,15 @@ def test_get_expired_without_current_block_fn_is_empty(self, tmp_path: Path):
assert store.get_expired_reservation_pins() == []
store.close()

def test_update_reserved_until_keeps_row_a_purge_would_drop(self, tmp_path: Path):
"""Regression: after the contract extends a reservation, refreshing the
pin's reserved_until must keep it alive past its original TTL — exactly
as update_reserved_until does for pending_confirms."""
def test_extend_reservation_deadline_keeps_pin_a_purge_would_drop(self, tmp_path: Path):
"""Regression: after the contract extends a reservation, bumping the
pin's reserved_until must keep it alive past its original TTL."""
store = ValidatorStateStore(
db_path=tmp_path / 'state.db',
current_block_fn=lambda: 1003,
)
store.upsert_reservation_pin(PIN_SAMPLE1) # reserved_until=1000, would be purged at 1003
store.update_reservation_pin_reserved_until('miner-1', 1300)
store.extend_reservation_deadline('miner-1', 1300)

pin = store.get_reservation_pin('miner-1')
assert pin.reserved_until == 1300
Expand All @@ -151,12 +150,49 @@ def test_update_reserved_until_keeps_row_a_purge_would_drop(self, tmp_path: Path
assert store.get_reservation_pin('miner-1') is not None
store.close()

def test_update_reserved_until_unknown_hotkey_is_noop(self, tmp_path: Path):
def test_extend_reservation_deadline_unknown_hotkey_is_noop(self, tmp_path: Path):
store = ValidatorStateStore(db_path=tmp_path / 'state.db')
store.update_reservation_pin_reserved_until('miner-unknown', 9999)
store.extend_reservation_deadline('miner-unknown', 9999)
assert store.get_reservation_pin('miner-unknown') is None
store.close()

def test_extend_reservation_deadline_bumps_both_copies(self, tmp_path: Path):
"""The shared mutator must advance BOTH the pending_confirms row and the
reservation pin, so neither purge sweep drops a still-live reservation
(#441). Updating only one copy is what desynced the pin."""
store = ValidatorStateStore(
db_path=tmp_path / 'state.db',
current_block_fn=lambda: 1003,
)
store.upsert_reservation_pin(PIN_SAMPLE1) # reserved_until=1000
store.enqueue(
PendingConfirm(
miner_hotkey='miner-1',
from_tx_hash='tx-1',
from_chain='btc',
to_chain='tao',
from_address='bc1-user',
to_address='5user',
tao_amount=123,
from_amount=456,
to_amount=789,
miner_from_address='bc1-miner',
miner_to_address='5miner',
rate_str='345',
reserved_until=1000,
queued_at=1.0,
)
)

store.extend_reservation_deadline('miner-1', 1300)

assert store.get_reservation_pin('miner-1').reserved_until == 1300
assert store.get_all()[0].reserved_until == 1300
# Same-block purges leave both rows alone now that both TTLs are current.
assert store.purge_expired_pending_confirms() == 0
assert store.purge_expired_reservation_pins() == 0
store.close()


class TestReservationPinCrossTable:
def test_delete_hotkey_clears_the_pin(self, tmp_path: Path):
Expand Down
Loading