Optimize TaskBroker scanning: larger batches + cursor-based scan#213
Open
Optimize TaskBroker scanning: larger batches + cursor-based scan#213
Conversation
0df0ef0 to
bb95080
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
27665b0 to
8ddb7b3
Compare
Tests that all code paths creating task keys (enqueue, expedite, restart, concurrency grants, retries, priority ordering) result in tasks being picked up by the TaskBroker scanner even when those tasks sort before entries already in the buffer. This coverage is important before optimizing the scanner to use cursor-based scanning. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The TaskBroker scanner previously always started from the beginning of the task group key range on every scan pass. When the buffer already contained N entries, each scan had to read through all N before finding new entries to insert — O(buffer_size) wasted reads per scan. Now the scanner tracks a cursor (the last key read) and resumes from there on subsequent passes within a fill cycle. This eliminates the redundant re-reads of already-buffered entries. The cursor is invalidated whenever wakeup() fires, which happens on every code path that creates new task keys (enqueue, expedite, restart, concurrency grants, retries). This ensures tasks created behind the cursor are always picked up on the next scan. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces the synthetic scan simulation with a benchmark that exercises the full TaskBroker pipeline end-to-end: enqueue 50K tasks via import, then drain them via sustained dequeue calls measuring throughput and per-batch latency. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Instead of tracking a cursor key and creating a new SlateDB iterator each scan pass, hold the actual DbIterator as a local variable in the scan loop and march it forward. This avoids both the iterator creation cost (~160us) and the cost of re-reading already-buffered entries. The iterator stops when it hits future-timestamped tasks (HitFuture) or exhausts the range (Exhausted). It is dropped and recreated when: - wakeup() fires (new tasks may have been written behind the cursor) - The iterator is older than MAX_ITER_AGE (5s) - The iterator is exhausted When a future task is encountered, the consumed entry is held back and re-evaluated on the next pass with a fresh now_ms, so it is never lost from the iterator. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
905b8a2 to
cdc0501
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Summary
scan_batchfrom 1024 → 4096 so a single scan pass can refill the buffer from low-watermark to target, eliminating 3x redundant re-reads per refill cyclenow_ms, so it is never lost from the iteratorwakeup()invalidates the cached iterator, ensuring newly-created tasks (from enqueue, expedite, restart, concurrency grants, retries) that sort before the old cursor position are picked up via a fresh scan from the beginningMAX_ITER_AGE(5s) to pick up scheduled tasks that have become readyTest plan
🤖 Generated with Claude Code
Note
Medium Risk
Changes core dequeue/scanner behavior by introducing a cached iterator and new stop/resume semantics around future tasks and wakeups, which could affect task availability/latency if incorrect. Risk is mitigated by extensive new regression tests covering behind-cursor task creation paths.
Overview
Optimizes TaskBroker scanning to avoid redundant DB re-reads by caching a SlateDB
DbIteratoracross scan passes, advancing a cursor through the task-group key range instead of restarting from the beginning every time.The scan loop now refreshes the iterator on
wakeup(), on iterator exhaustion, or afterMAX_ITER_AGE(5s) to pick up newly-ready scheduled work; it also halts at the first future task and carries that consumed entry forward for re-evaluation on the next pass.Adds a new
task_scanner_ordering_tests.rssuite to ensure tasks created “behind the cursor” (e.g.,start_at_ms=0, expedite/restart, concurrency grants, retries, priority ordering, multi-group, bulk enqueue) are still eventually dequeued, and updatestask_scanner_profilebenchmark to measure real sustained dequeue throughput instead of synthetic scan timing.Written by Cursor Bugbot for commit cdc0501. This will update automatically on new commits. Configure here.