Skip to content

feat: add maxConcurrentRuns to worker (#162 step 3/4)#173

Merged
coji merged 6 commits into
mainfrom
feat/max-concurrent-runs
Mar 28, 2026
Merged

feat: add maxConcurrentRuns to worker (#162 step 3/4)#173
coji merged 6 commits into
mainfrom
feat/max-concurrent-runs

Conversation

@coji
Copy link
Copy Markdown
Owner

@coji coji commented Mar 28, 2026

Summary

  • Add maxConcurrentRuns option to createDurably() for parallel run processing
  • Worker maintains a slot-based concurrency pool: fills slots eagerly, refills immediately on completion, falls back to delayed polling when idle
  • Default: 1 (sequential — no behavioral change for existing users)
  • stop() uses Promise.allSettled to gracefully handle rejected in-flight runs
  • Concurrency key exclusion still enforced by storage layer

Changes

Area Files What
Worker worker.ts Slot-based polling loop: fillSlots()processSlotCycle() → refill/idle
Config durably.ts maxConcurrentRuns in DurablyOptions, validated as positive integer
Unit tests worker.test.ts Scheduler tests: max concurrency, immediate refill, no stacked polls
Integration worker.shared.ts, concurrency.shared.ts, durably.shared.ts Parallel execution, concurrency key exclusion, validation, stop semantics
Docs llms.md, website/api/, website/guide/, examples API reference and deployment guide

Context

Step 3/4 of #162 (core internal restructuring). Storage layer already supports concurrent claims (SKIP LOCKED, lease_generation, concurrency_key), so the change is isolated to the worker polling loop.

Test plan

  • pnpm validate passes (format, lint, typecheck, tests)
  • takt acceptance: 8/8 completion criteria met
  • Integration tests: parallel execution, concurrency key exclusion, immediate refill, stop waits for all in-flight
  • CodeRabbit review

🤖 Generated with Claude Code

Summary by CodeRabbit

  • 新機能

    • ワーカーの同時実行上限を指定するオプション maxConcurrentRuns を追加(デフォルト: 1)
  • 改善

    • stop() が全ての進行中実行とワーカー起動のアイドル保守処理完了まで待機する動作を明確化
    • pollingIntervalMs の説明を「アイドル時の再ポーリング遅延」として明確化
  • テスト

    • 同時実行制御とポーリング挙動を検証するテスト群を追加
  • ドキュメント

    • 例とAPIドキュメントを更新し新オプションと挙動を追記

Add maxConcurrentRuns option to createDurably() for parallel run
processing. Worker maintains a slot-based concurrency pool: fills
slots eagerly, refills immediately when work completes, and falls
back to delayed polling when idle.

- Default: 1 (sequential, no behavioral change for existing users)
- Validated as positive integer
- stop() uses Promise.allSettled to handle rejected in-flight runs
- Concurrency key exclusion still enforced by storage layer
- Docs, examples, and tests updated

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@vercel
Copy link
Copy Markdown

vercel Bot commented Mar 28, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
durably-demo Ready Ready Preview Mar 28, 2026 4:15pm
1 Skipped Deployment
Project Deployment Actions Updated (UTC)
durably-demo-vercel-turso Ignored Ignored Preview Mar 28, 2026 4:15pm

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 28, 2026

Warning

Rate limit exceeded

@coji has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 7 minutes and 40 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 7 minutes and 40 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 276dcfb9-12e6-452f-9b61-311f19f7a9c3

📥 Commits

Reviewing files that changed from the base of the PR and between 5ff88d7 and c5985bf.

📒 Files selected for processing (4)
  • packages/durably/docs/llms.md
  • packages/durably/tests/shared/concurrency.shared.ts
  • packages/durably/tests/shared/worker.shared.ts
  • website/public/llms.txt
📝 Walkthrough

Walkthrough

DurablyOptionsmaxConcurrentRuns(デフォルト1)を追加し、ワーカーを単一ポーリングループからスロットベースの並行実行モデルへ移行。検証・アイドル処理・ポーリングスケジューリング・stop()待機挙動を更新し、関連ドキュメントとテストを拡張しました。

Changes

Cohort / File(s) Summary
コア実装
packages/durably/src/durably.ts
DurablyOptionsmaxConcurrentRuns?: numberを追加。DEFAULTSへ反映し、validateMaxConcurrentRuns()で検証。createDurablyで検証済み値をワーカーへ渡す。
ワーカーロジック
packages/durably/src/worker.ts
WorkerConfigmaxConcurrentRunsを追加。単一poll()とinFlight機構を廃止し、fillSlots()/activeCount/activePromisesのスロットベース並列処理モデルへ再設計。アイドル保守・遅延ポーリング・stop()待機を改訂。
型テスト
packages/durably/tests/node/types.test.ts
createDurably第1引数型にmaxConcurrentRuns?: numberが含まれることを型レベルでアサートするテストを追加。
ワーカースケジューラ単体テスト
packages/durably/tests/node/worker.test.ts
フェイクタイマーでスケジューラのタイムアウト蓄積防止、maxConcurrentRuns上限、スロット即時補填、遅延ポーリング重複防止を検証するテスト群を追加。
統合並行実行テスト
packages/durably/tests/shared/concurrency.shared.ts
maxConcurrentRuns: 2/3での並行実行テストを追加。concurrencyKeyの独立性・順序保証テストを追加・調整。
Durably共有テスト
packages/durably/tests/shared/durably.shared.ts
ValidationError導入。maxConcurrentRuns省略時のデフォルト成功テストと無効値(0、負数、非整数、NaN、Infinity)での検証エラーを追加。
ワーカーテスト共有
packages/durably/tests/shared/worker.shared.ts
maxConcurrentRuns省略/複数値/1での即時補填/stop()の待機動作などを検証するスイートを追加。
ドキュメント・例
packages/durably/docs/llms.md, packages/durably-react/docs/llms.md, website/api/*.md, website/public/llms.txt, examples/**/durably*.ts
多数の例・ドキュメントにmaxConcurrentRuns: 1を追記。pollingIntervalMs説明を「アイドル時の再ポーリング遅延」に明確化。stop()挙動記述を更新。

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Worker as Worker (Orchestrator)
    participant SlotFiller as Slot Filler (fillSlots)
    participant Processor as Job Processor (processOne)
    participant Storage as Storage
    participant Idle as Idle Maintenance
    participant Scheduler as Polling Scheduler

    Client->>Worker: start()
    activate Worker

    Worker->>SlotFiller: fillSlots()
    activate SlotFiller

    loop while activeCount < maxConcurrentRuns
        SlotFiller->>Processor: processOne()
        activate Processor
        Processor->>Storage: claim/process job
        alt job found
            Storage-->>Processor: jobData
            Processor->>Storage: mark complete
        else no job
            Storage-->>Processor: false
        end
        deactivate Processor
        SlotFiller->>SlotFiller: track activePromises/activeCount
    end
    deactivate SlotFiller

    alt all slots idle and processOne returned false
        Worker->>Idle: runIdleMaintenanceSafe()
        Idle-->>Worker: onIdle done
        Worker->>Scheduler: scheduleDelayedPoll(pollingIntervalMs)
    else slot freed mid-run
        SlotFiller->>Processor: claim next immediately
    end

    Client->>Worker: stop()
    Worker->>Worker: await Promise.allSettled(activePromises + idleMaintenance)
    deactivate Worker
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 ひとつずつ、ゆっくり跳ぶよ、
スロット開いたらすぐ埋めるよ。
数を決めて、順序は守る、
テストと一緒にぴょんと跳ねる、
小さなうさぎのデプロイ祝う。

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 15.38% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add maxConcurrentRuns to worker (#162 step 3/4)' accurately reflects the main changes in the changeset—adding a maxConcurrentRuns feature to control worker concurrency and implementing it throughout the codebase.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/max-concurrent-runs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
website/public/llms.txt (1)

577-577: ブラウザ例の空行について確認してください。

Line 577 で pollingIntervalMs: 100, の後に空行が追加されていますが、Node.js 例(Line 63-64)では maxConcurrentRuns が追加されている一方、ブラウザ例では追加されていません。一貫性のため、ブラウザ例にも maxConcurrentRuns を追加するか、空行を削除することを検討してください。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@website/public/llms.txt` at line 577, ブラウザ例の設定で "pollingIntervalMs: 100,"
の後に空行があるため、Node.js 例(該当箇所で "maxConcurrentRuns"
を追加している)と不一致になっています。ブラウザ例にも同じ構成プロパティ "maxConcurrentRuns" を追加して Node.js
例と揃えるか、不要であれば空行を削除して両例のフォーマットを一致させてください(参照する識別子: pollingIntervalMs,
maxConcurrentRuns)。
packages/durably/docs/llms.md (1)

577-577: ブラウザ例に maxConcurrentRuns が欠落しています。

Node.js 例では maxConcurrentRuns: 1 が追加されていますが、ブラウザ例では空行のみが追加されています。一貫性のため、ブラウザ例にも同様のオプションとコメントを追加することを検討してください。

📝 提案される修正
 const durably = createDurably({
   dialect,
   pollingIntervalMs: 100,
-
+  maxConcurrentRuns: 1, // Concurrent runs processed by the worker (default: 1)
   leaseRenewIntervalMs: 500,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/docs/llms.md` at line 577, ブラウザ用の例に Node.js
例で追加されたオプションが欠けているため、ブラウザのサンプルに maxConcurrentRuns: 1 を追加し、Node.js と同じ説明コメント(例:
"// Limit to one concurrent run for deterministic behavior" など)を入れて一貫性を保ってください;
対象となるサンプルはドキュメント内のブラウザ例の設定オブジェクト(参照: maxConcurrentRuns, ブラウザ例の初期化ブロック)を更新します。
packages/durably/tests/shared/concurrency.shared.ts (1)

124-147: ローカルに作る Durably インスタンスも finally で teardown した方が安全です。

この 3 ケースは末尾でしか dp.stop() / d.db.destroy() を呼んでいないので、途中の waitForexpect が失敗すると polling timer と DB handle が残ります。try/finally か suite-level cleanup に寄せておくと、後続テストへの波及を防げます。

Also applies to: 164-203, 256-281

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/tests/shared/concurrency.shared.ts` around lines 124 - 147,
Wrap the Durably test setup and teardown in a try/finally (or use suite-level
afterEach) so dp.stop() and d.db.destroy() always run even if assertions or
waitFor fail: locate the creation calls (createDurably, d.migrate, d.register,
dp.start) and ensure dp.stop() and d.db.destroy() are invoked in the finally
block (or centralized cleanup) to prevent leftover polling timers/DB handles;
apply the same change to the other two test blocks referenced (the blocks around
lines 164-203 and 256-281).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/durably/src/durably.ts`:
- Around line 104-110: The validateMaxConcurrentRuns function currently uses
Number.isInteger which allows unsafe integers; change the validation in
validateMaxConcurrentRuns to use Number.isSafeInteger(value) and keep the >= 1
check (e.g., if (!Number.isSafeInteger(value) || value < 1) throw new
ValidationError('maxConcurrentRuns must be a positive safe integer')); update
the error message accordingly so callers and the worker loop (while (running &&
activeCount < maxConcurrentRuns)) cannot receive unsafe large integers that
would cause excessive synchronous Promise creation.

In `@packages/durably/src/worker.ts`:
- Around line 81-89: The current branch for "!didProcess && activeCount > 0"
never reschedules a poll, causing freed slots to be lost while long-running runs
remain; update the logic in the worker loop to call scheduleDelayedPoll (or
otherwise enqueue a poll) when didProcess is false but activeCount > 0 so empty
slots will trigger another poll; specifically modify the branch around
didProcess/activeCount/running (references: didProcess, activeCount, running,
runIdleMaintenanceSafe, scheduleDelayedPoll) to ensure scheduleDelayedPoll() is
invoked in the "!didProcess && activeCount > 0" case so available capacity is
reused.
- Around line 107-113: The promise returned by p.finally(...) must also be
handled to avoid an unhandled rejection when the original p rejects; update the
code that currently calls void p.finally(() => { activePromises.delete(p) }) so
that the finally-returned promise is captured and has a catch handler (e.g.,
chain .catch(...) on the result) or chain .catch(...) before .finally; reference
the promise variable p and the cleanup action activePromises.delete(p) (and the
surrounding processSlotCycle/processOne flow) when implementing the fix.

---

Nitpick comments:
In `@packages/durably/docs/llms.md`:
- Line 577: ブラウザ用の例に Node.js 例で追加されたオプションが欠けているため、ブラウザのサンプルに maxConcurrentRuns:
1 を追加し、Node.js と同じ説明コメント(例: "// Limit to one concurrent run for deterministic
behavior" など)を入れて一貫性を保ってください; 対象となるサンプルはドキュメント内のブラウザ例の設定オブジェクト(参照:
maxConcurrentRuns, ブラウザ例の初期化ブロック)を更新します。

In `@packages/durably/tests/shared/concurrency.shared.ts`:
- Around line 124-147: Wrap the Durably test setup and teardown in a try/finally
(or use suite-level afterEach) so dp.stop() and d.db.destroy() always run even
if assertions or waitFor fail: locate the creation calls (createDurably,
d.migrate, d.register, dp.start) and ensure dp.stop() and d.db.destroy() are
invoked in the finally block (or centralized cleanup) to prevent leftover
polling timers/DB handles; apply the same change to the other two test blocks
referenced (the blocks around lines 164-203 and 256-281).

In `@website/public/llms.txt`:
- Line 577: ブラウザ例の設定で "pollingIntervalMs: 100," の後に空行があるため、Node.js 例(該当箇所で
"maxConcurrentRuns" を追加している)と不一致になっています。ブラウザ例にも同じ構成プロパティ "maxConcurrentRuns"
を追加して Node.js 例と揃えるか、不要であれば空行を削除して両例のフォーマットを一致させてください(参照する識別子:
pollingIntervalMs, maxConcurrentRuns)。
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a94c70b7-1174-4ec9-8c34-99cb7c8a61f9

📥 Commits

Reviewing files that changed from the base of the PR and between 0495ec5 and 3a9b3a1.

📒 Files selected for processing (11)
  • packages/durably/docs/llms.md
  • packages/durably/src/durably.ts
  • packages/durably/src/worker.ts
  • packages/durably/tests/node/types.test.ts
  • packages/durably/tests/node/worker.test.ts
  • packages/durably/tests/shared/concurrency.shared.ts
  • packages/durably/tests/shared/durably.shared.ts
  • packages/durably/tests/shared/worker.shared.ts
  • website/api/create-durably.md
  • website/api/index.md
  • website/public/llms.txt

Comment thread packages/durably/src/durably.ts
Comment thread packages/durably/src/worker.ts Outdated
Comment thread packages/durably/src/worker.ts Outdated
- Use Number.isSafeInteger instead of Number.isInteger to prevent
  unsafe large integers from causing excessive Promise creation
- Fix lost slot bug: schedule delayed poll when !didProcess regardless
  of activeCount, so idle slots re-poll for new work while other slots
  are still running
- Fix unhandled rejection: chain .finally().catch() instead of
  separate .catch() and .finally() on the same promise

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/durably/src/durably.ts`:
- Around line 96-110: The validateMaxConcurrentRuns function currently only
checks Number.isSafeInteger and >0, which allows impractically large values; add
an upper cap constant (e.g., MAX_CONCURRENT_RUNS = 1000) and update
validateMaxConcurrentRuns to throw if value > MAX_CONCURRENT_RUNS, and adjust
DEFAULTS.maxConcurrentRuns if needed to use a safe default under that cap;
reference validateMaxConcurrentRuns and DEFAULTS to locate and enforce the new
upper bound so worker.ts's fillSlots loop cannot allocate millions of slots.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b3725483-73d1-4cfd-9149-1125983116c5

📥 Commits

Reviewing files that changed from the base of the PR and between 3a9b3a1 and 3587e0b.

📒 Files selected for processing (14)
  • examples/fullstack-react-router/app/lib/durably.server.ts
  • examples/fullstack-vercel-turso/app/lib/durably.server.ts
  • examples/server-node-postgres/lib/durably.ts
  • examples/server-node/lib/durably.ts
  • examples/spa-react-router/app/lib/durably.ts
  • examples/spa-vite-react/src/lib/durably.ts
  • packages/durably-react/docs/llms.md
  • packages/durably/src/durably.ts
  • packages/durably/src/worker.ts
  • packages/durably/tests/node/worker.test.ts
  • website/api/durably-react/fullstack.md
  • website/api/durably-react/spa.md
  • website/guide/deployment.md
  • website/public/llms.txt
✅ Files skipped from review due to trivial changes (4)
  • examples/spa-react-router/app/lib/durably.ts
  • website/guide/deployment.md
  • website/public/llms.txt
  • packages/durably-react/docs/llms.md

Comment thread packages/durably/src/durably.ts
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ns removal

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Ensures dp.stop() and d.db.destroy() always run even if assertions
or waitFor timeout, preventing leftover polling timers and DB handles.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coji coji merged commit a9c1645 into main Mar 28, 2026
5 checks passed
@coji coji deleted the feat/max-concurrent-runs branch March 28, 2026 16:17
coji added a commit that referenced this pull request Mar 28, 2026
…afe integer checks

Add structured checklist to spec-review covering:
- Input validation: exact valid domain for numeric options (safe integer)
- Concurrency safety AND liveness (idle slots keep polling, freed slots reused)
- Detached promise rejection ownership (no unhandled rejections)
- stop()/shutdown must use allSettled pattern
- Behavioral regression criteria

Sync matching items to spec-draft quality checklist.

Learned from #173 CodeRabbit review: idle slot loss, unhandled rejection,
and unsafe integer validation were missed at spec stage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
coji added a commit that referenced this pull request Mar 29, 2026
…afe integer checks (#175)

Add structured checklist to spec-review covering:
- Input validation: exact valid domain for numeric options (safe integer)
- Concurrency safety AND liveness (idle slots keep polling, freed slots reused)
- Detached promise rejection ownership (no unhandled rejections)
- stop()/shutdown must use allSettled pattern
- Behavioral regression criteria

Sync matching items to spec-draft quality checklist.

Learned from #173 CodeRabbit review: idle slot loss, unhandled rejection,
and unsafe integer validation were missed at spec stage.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant