Skip to content

feat: coalesce trigger with concurrencyKey pending limit (#143)#148

Merged
coji merged 20 commits into
mainfrom
feat/coalesce-trigger
Mar 25, 2026
Merged

feat: coalesce trigger with concurrencyKey pending limit (#143)#148
coji merged 20 commits into
mainfrom
feat/coalesce-trigger

Conversation

@coji
Copy link
Copy Markdown
Owner

@coji coji commented Mar 25, 2026

Summary

  • concurrencyKey now enforces max 1 pending — partial unique index prevents unbounded queue buildup. Without coalesce, a second pending trigger throws ConflictError
  • coalesce: 'skip' — opt-in graceful handling: returns the existing pending run instead of creating a new one
  • trigger() returns TriggerResult — extends TypedRun with disposition: 'created' | 'idempotent' | 'coalesced'. Existing code reading run properties works unchanged
  • run:coalesced event — new event for observability, carries skippedInput and skippedLabels
  • releaseExpiredLeases() 2-phase — handles partial unique index safely when expired leases conflict with existing pending runs
  • Breaking: trigger() / batchTrigger() return type, TriggerResponse includes disposition, run:trigger no longer emitted on idempotent hits

Breaking changes

Before After
trigger() returns TypedRun Returns TriggerResult (TypedRun & { disposition })
triggerAndWait() returns { id, output } Returns { id, output, disposition }
batchTrigger() returns TypedRun[] Returns TriggerResult[]
TriggerResponse has runId Has runId + disposition
concurrencyKey allows unlimited pending Max 1 pending per key (ConflictError)
run:trigger emitted on idempotent hits No event on idempotent

Closes #143

Test plan

  • 24 new coalesce test cases (node + browser)
  • All 188 tests pass (pnpm validate)
  • Existing concurrency tests updated for new index constraint
  • Manual test with upflow (post-merge)

🤖 Generated with Claude Code

Summary by CodeRabbit

  • 新機能

    • ジョブトリガーに coalesce: 'skip' を追加し、既存の保留中実行を再利用(disposition: created/idempotent/coalesced)
    • run:coalesced イベントを追加し、統合された実行の通知を提供
    • クライアントが run:coalesced を受信して一覧補充やフォロー中の実行切替を行うよう更新
  • ドキュメント

    • API/ガイド/イベント例に coalesce と disposition、run:coalesced を追加
  • テスト

    • ブラウザ/Node/共有の coalesce テストを追加
  • マイグレーション

    • 保留中同時実行を制約するデータベースインデックスを追加

coji and others added 12 commits March 25, 2026 18:58
Step-by-step plan derived from the RFC. Will be deleted after implementation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 1 of coalesce trigger implementation. Type and event definitions
only — no runtime behavior changes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 2 of coalesce trigger implementation. Adds a partial unique index
to enforce at most one pending run per (job_name, concurrency_key).
Updates existing concurrency tests to avoid creating multiple pending
runs with the same key (which now violates the constraint).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…erResult (#143)

Step 3 of coalesce trigger implementation. Breaking changes:
- storage.enqueue()/enqueueMany() return { run, disposition }
- trigger() returns TriggerResult (TypedRun & { disposition })
- batchTrigger() returns TriggerResult[]
- triggerAndWait() result includes disposition
- TriggerResponse includes disposition
- run:trigger event no longer emitted for idempotent hits

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…#143)

Step 4 of coalesce trigger implementation:
- parseUniqueViolation() distinguishes idempotency vs concurrency conflicts
- enqueue() catches INSERT conflict, supports coalesce: 'skip' (returns
  existing pending run) or throws ConflictError
- enqueueInTx() refactor: accepts optional transaction for batch atomicity
- enqueueMany() uses single transaction with per-item SAVEPOINTs
- trigger()/batchTrigger() validate coalesce option and emit run:coalesced
- findPendingByConcurrencyKey() shared query with deterministic ordering

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 5: expired leases with an existing pending run for the same
concurrencyKey are now failed instead of reset to pending (which would
violate the partial unique index). Remaining leases are reset per-row
with SAVEPOINT to handle concurrent trigger() races.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 6: Wire run:coalesced event through the full stack:
- durably-react types.ts: add run:coalesced to DurablyEvent union
- use-runs.ts: refresh runs list on run:coalesced
- use-job-subscription.ts: followLatest reacts to run:coalesced
- server.ts: forward run:coalesced via SSE stream

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The SSE-based client hooks were missing run:coalesced handling:
- client/use-runs.ts: refresh list on run:coalesced events
- client/use-job.ts: followLatest reacts to run:coalesced

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 7: Tests covering concurrencyKey pending limit, coalesce skip
behavior, disposition values, events, batchTrigger, triggerAndWait,
validation, and releaseExpiredLeases interaction.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 8: Update llms.md with:
- trigger() returns TriggerResult with disposition
- concurrencyKey enforces max 1 pending
- coalesce: 'skip' option
- run:coalesced event
- Updated type definitions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Step 9: PLAN.md served its purpose — all 9 steps implemented.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…pers (#143)

Deduplicate coalesce validation and disposition-based event emission
between trigger() and batchTrigger().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@vercel
Copy link
Copy Markdown

vercel Bot commented Mar 25, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
durably-demo Ready Ready Preview Mar 25, 2026 2:33pm
1 Skipped Deployment
Project Deployment Actions Updated (UTC)
durably-demo-vercel-turso Ignored Ignored Preview Mar 25, 2026 2:33pm

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Mar 25, 2026

Warning

Rate limit exceeded

@coji has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 16 minutes and 18 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 15c4099f-fdbe-4e51-9194-e0e6d1035114

📥 Commits

Reviewing files that changed from the base of the PR and between 9ac579c and 6b6ea4f.

📒 Files selected for processing (2)
  • packages/durably/tests/node/coalesce.postgres.test.ts
  • packages/durably/tests/shared/coalesce.shared.ts
📝 Walkthrough

Walkthrough

トリガーに coalesce: 'skip' を追加し、同じ concurrencyKey の既存ペンディング実行を再利用する挙動を実装。trigger 系の戻り値に disposition を追加し、run:coalesced イベントを導入。DB に部分一意インデックスを追加して原子性を担保。

Changes

Cohort / File(s) Summary
型定義とイベント公開
packages/durably/src/events.ts, packages/durably/src/types.ts, packages/durably/src/index.ts
run:coalesced / RunCoalescedEvent を追加し、DurablyEvent に含める。DispositionTriggerResult 型を公開エクスポートに追加。
ジョブ API とサーバー
packages/durably/src/job.ts, packages/durably/src/server.ts, packages/durably/src/durably.ts
trigger/batchTrigger/triggerAndWait の戻り値に disposition を含めるよう変更。coalesce?: 'skip' オプション検証と優先順位(idempotency > coalesce)を追加。retrigger()enqueue の新戻り値形を想定するよう修正。
ストレージ層と DB ロジック
packages/durably/src/storage.ts, packages/durably/src/migrations.ts
enqueue/enqueueMany{ run, disposition } を返すよう変更。INSERT→ユニーク衝突判別(idempotent vs pending_concurrency)を導入し、coalesce:'skip' は既存 pending を返す(coalesced)。releaseExpiredLeases を二相処理へ再構成。マイグレーションで部分一意インデックスを追加。
クライアント(React)フック
packages/durably-react/src/types.ts, packages/durably-react/src/client/use-job.ts, packages/durably-react/src/client/use-runs.ts, packages/durably-react/src/hooks/use-job-subscription.ts, packages/durably-react/src/hooks/use-runs.ts
クライアント側のイベント型に run:coalesced を追加。SSE/サブスクリプションで run:coalesced を受けて refresh() や follow-latest の currentRunId 切替を行うよう拡張。
ドキュメントと公開例
packages/durably/docs/llms.md, website/public/llms.txt, website/api/*, website/guide/*, website/guide/error-handling.md
coalesce: 'skip'disposition の説明・型・例を追加。run:coalesced イベントのドキュメントと HTTP /trigger の入出力例を更新。
テスト(追加・修正)
packages/durably/tests/shared/coalesce.shared.ts, packages/durably/tests/browser/coalesce.test.ts, packages/durably/tests/node/coalesce.test.ts, packages/durably/tests/**/…
包括的なコアレッシングテストを追加(Node/Browser 方言)。既存テストを enqueue の新戻り値形に合わせて destructuring を多数修正し、一部シーケンス/待機ロジックを強化。

Sequence Diagram

sequenceDiagram
    participant Client
    participant Server
    participant Database
    participant EventEmitter
    participant ReactHook

    Client->>Server: POST /trigger (input, concurrencyKey, coalesce:'skip')
    Server->>Database: INSERT run (status = pending)
    alt Unique constraint violation (pending exists)
        Database-->>Server: constraint error
        Server->>Database: SELECT existing pending run
        Database-->>Server: existing run
        Server->>EventEmitter: emit('run:coalesced', { runId, jobName, skippedInput, skippedLabels })
        Server-->>Client: { run, disposition: 'coalesced' }
    else Insert succeeded
        Database-->>Server: created run
        Server->>EventEmitter: emit('run:trigger', { runId, jobName })
        Server-->>Client: { run, disposition: 'created' }
    end

    EventEmitter->>ReactHook: run:coalesced / run:trigger
    ReactHook->>ReactHook: if followLatest -> switch currentRunId
    ReactHook->>ReactHook: refresh runs list / UI
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 跳ねる耳で見たよ、キューの列
ひとつ残して、余分はそっと消すよ
同じ鍵の声はまとめて聞くから
coalesce が小道を作るよ、ぴょんと一歩
走れ、disposition の歌を鳴らして

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.15% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed PR タイトルは coalesce trigger with concurrencyKey pending limit で、変更セットの主要な機能を正確に説明しており、開発者の視点から重要な変更を簡潔に要約しています。
Linked Issues check ✅ Passed PR は #143 の全ての要件を満たしています。coalesce: 'skip' オプション、disposition の実装、run:coalesced イベント、部分一意インデックス、idempotencyKey の優先順位、エラーハンドリング、両環境のテスト追加が完全に実装されています。
Out of Scope Changes check ✅ Passed 全ての変更は #143 の要件に直接関連しており、範囲外の変更はありません。マイグレーション、ストレージ層、イベントシステム、API エンドポイント、ドキュメント、テストはすべてコアリング機能をサポートしています。

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/coalesce-trigger

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

- releaseExpiredLeases Phase 2: wrap in transaction (PostgreSQL requires
  SAVEPOINTs inside transaction blocks)
- Add SAVEPOINT name reuse safety comment in enqueueInTx
- Add NULL = NULL behavior comment in Phase 1
- Add idempotent no-event comment in emitDispositionEvent
- Test: retrigger with pending same-key run throws ConflictError
- Test: batch with mixed coalesce/non-coalesce for different keys
- Test: idempotencyKey takes priority over concurrencyKey conflict

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
packages/durably/docs/llms.md (1)

289-316: ⚠️ Potential issue | 🟡 Minor

run:trigger の idempotent 非発火も明記しておきたいです

今回の breaking change にはイベント発火条件の変更も入っているので、イベント一覧か run:trigger の説明に「idempotency hit では emit されない」を 1 行足しておくと、利用者と LLM の期待値ずれを防げます。 As per coding guidelines, "Update packages/durably/docs/llms.md whenever API changes are made to keep LLM documentation in sync".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/docs/llms.md` around lines 289 - 316, Add a one-line note to
the Events section next to the run:trigger example clarifying that the
'run:trigger' event is not emitted on an idempotency hit (i.e., when a run is
skipped due to idempotency); update the text around the
durably.on('run:trigger', ...) sample to include that explicit statement so
users and LLMs know idempotent skips do not fire 'run:trigger'.
website/public/llms.txt (1)

1-1451: ⚠️ Potential issue | 🟠 Major

生成ファイルの直接編集を避け、生成コマンド経由にしてください。

website/public/llms.txt は生成物なので、手編集ではなく packages/*/docs/llms.md 側を更新してから再生成した成果物のみをコミットする形に揃えてください。

As per coding guidelines, "Never edit generated files directly; regenerate website/public/llms.txt using pnpm --filter durably-website generate:llms when packages/*/docs/llms.md changes".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@website/public/llms.txt` around lines 1 - 1451, The committed change edited
the generated LLM documentation artifact (llms.txt) directly; instead
restore/revert that file and update the canonical source markdown (llms.md under
the packages/docs source) then run the docs generation command (pnpm --filter
durably-website generate:llms) to regenerate the artifact and commit the
generated output; ensure future edits are made to the source llms.md and not the
generated llms.txt.
🧹 Nitpick comments (1)
packages/durably/tests/shared/coalesce.shared.ts (1)

192-208: idempotencyKey 優先の組み合わせ回帰テストも入れておきたいです

仕様上は idempotencyKeycoalesce より優先なので、{ idempotencyKey, concurrencyKey, coalesce: 'skip' } を同時に渡して idempotent を返すケースを 1 本追加しておくと、storage / job の分岐順序を将来壊しにくいです。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/tests/shared/coalesce.shared.ts` around lines 192 - 208, Add
a regression test that verifies idempotencyKey takes precedence over coalesce:
add an it block similar to the existing tests that calls d.jobs.job.trigger
twice where the first call uses { idempotencyKey: 'idem-1', concurrencyKey:
'key-1' } and the second call uses { idempotencyKey: 'idem-1', concurrencyKey:
'key-1', coalesce: 'skip' }, then assert the second result's disposition is
'idempotent' (use the same pattern as the other tests calling d.jobs.job.trigger
and expect(second.disposition).toBe('idempotent')).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/durably-react/src/hooks/use-job-subscription.ts`:
- Around line 136-143: The handler for the durably.on('run:coalesced') event
currently dispatches dispatch({ type: 'switch_to_run', runId: event.runId })
which causes the reducer to set status: 'leased'; change the handler so that
when event.jobName === jobName and followLatest is true it dispatches an action
that indicates a coalesced pending run (for example a distinct action type like
'switch_to_pending_run' or the existing 'switch_to_run' but with a payload flag
such as { runId: event.runId, status: 'pending' }), and update the reducer to
handle that action and set status: 'pending' instead of 'leased' so coalesced
runs render as pending rather than leased.

In `@packages/durably/src/storage.ts`:
- Around line 297-323: parseUniqueViolation currently returns null for many
non-unique-violation errors causing insertLabelRows and releaseExpiredLeases
catch blocks to treat connection errors and other failures as
pending-concurrency conflicts; update parseUniqueViolation to detect
driver/SQL-state specific unique-violation signals (e.g. Postgres err.code /
SQLSTATE '23505' and any driver-specific constraint property) and only return
'idempotency' or 'pending_concurrency' when you can confidently identify a
UNIQUE violation (fall back to null otherwise); then update the callers
(insertLabelRows and the Phase 2 catch in releaseExpiredLeases that rely on
findPendingByConcurrencyKey) to check the explicit unique-violation result
instead of treating any null/unknown as a concurrency conflict, and keep
original errors (or rethrow) for connection/unknown failures so retries/logging
can surface real issues.
- Around line 762-801: Phase 2 is using SAVEPOINTs outside a transaction and
unguarded updates that can overwrite concurrent state; wrap the per-row
reset/fail logic in an explicit transaction (use db.transaction or tx passed
into the block) instead of raw sql`SAVEPOINT` calls, remove raw savepoint SQL,
and run the update(s) against tx; when updating durably_runs for a reset or
fail, include the original guards in the WHERE (e.g. where id = row.id AND
status = 'leased' AND lease_expires_at <= now and, if available,
lease_generation = <value>) so you only change rows still leased/expired, and in
the catch only handle unique-violation errors by performing the "failed" update
(still with the guarded WHERE) while rethrowing any other DB errors.

---

Outside diff comments:
In `@packages/durably/docs/llms.md`:
- Around line 289-316: Add a one-line note to the Events section next to the
run:trigger example clarifying that the 'run:trigger' event is not emitted on an
idempotency hit (i.e., when a run is skipped due to idempotency); update the
text around the durably.on('run:trigger', ...) sample to include that explicit
statement so users and LLMs know idempotent skips do not fire 'run:trigger'.

In `@website/public/llms.txt`:
- Around line 1-1451: The committed change edited the generated LLM
documentation artifact (llms.txt) directly; instead restore/revert that file and
update the canonical source markdown (llms.md under the packages/docs source)
then run the docs generation command (pnpm --filter durably-website
generate:llms) to regenerate the artifact and commit the generated output;
ensure future edits are made to the source llms.md and not the generated
llms.txt.

---

Nitpick comments:
In `@packages/durably/tests/shared/coalesce.shared.ts`:
- Around line 192-208: Add a regression test that verifies idempotencyKey takes
precedence over coalesce: add an it block similar to the existing tests that
calls d.jobs.job.trigger twice where the first call uses { idempotencyKey:
'idem-1', concurrencyKey: 'key-1' } and the second call uses { idempotencyKey:
'idem-1', concurrencyKey: 'key-1', coalesce: 'skip' }, then assert the second
result's disposition is 'idempotent' (use the same pattern as the other tests
calling d.jobs.job.trigger and expect(second.disposition).toBe('idempotent')).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 46eb190d-3f1e-4e11-9cd8-aa14295b7250

📥 Commits

Reviewing files that changed from the base of the PR and between 5b8313a and 0389eef.

📒 Files selected for processing (25)
  • packages/durably-react/src/client/use-job.ts
  • packages/durably-react/src/client/use-runs.ts
  • packages/durably-react/src/hooks/use-job-subscription.ts
  • packages/durably-react/src/hooks/use-runs.ts
  • packages/durably-react/src/types.ts
  • packages/durably/docs/llms.md
  • packages/durably/src/durably.ts
  • packages/durably/src/events.ts
  • packages/durably/src/index.ts
  • packages/durably/src/job.ts
  • packages/durably/src/migrations.ts
  • packages/durably/src/server.ts
  • packages/durably/src/storage.ts
  • packages/durably/tests/browser/coalesce.test.ts
  • packages/durably/tests/browser/db-stress.test.ts
  • packages/durably/tests/node/coalesce.test.ts
  • packages/durably/tests/node/migration-consolidated.test.ts
  • packages/durably/tests/shared/coalesce.shared.ts
  • packages/durably/tests/shared/concurrency.shared.ts
  • packages/durably/tests/shared/db-concurrency.shared.ts
  • packages/durably/tests/shared/db-semantics.shared.ts
  • packages/durably/tests/shared/db-stress.shared.ts
  • packages/durably/tests/shared/durably.shared.ts
  • packages/durably/tests/shared/storage.shared.ts
  • website/public/llms.txt

Comment thread packages/durably-react/src/hooks/use-job-subscription.ts
Comment thread packages/durably/src/storage.ts
Comment thread packages/durably/src/storage.ts Outdated
coji and others added 2 commits March 25, 2026 22:56
- api/events.md: add run:coalesced event documentation
- api/define-job.md: trigger/batchTrigger return TriggerResult,
  TriggerOptions.coalesce, triggerAndWait disposition
- api/http-handler.md: coalesce in request, disposition in response
- guide/concepts.md: coalesce option in trigger options example
- guide/error-handling.md: ConflictError handling pattern

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- use-job-subscription.ts: switch_to_run accepts optional status,
  run:coalesced dispatches with status 'pending' (not 'leased')
- storage.ts: add isUniqueViolation() to distinguish unique constraint
  violations from connection/other errors before entering conflict handling
- storage.ts: releaseExpiredLeases Phase 2 adds status/lease_expires_at
  WHERE guards to prevent overwriting concurrent state changes,
  catch block only handles unique violations (rethrows others)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- llms.md: note that run:trigger is not emitted on idempotent hits
- llms.txt: regenerated from source (not edited directly)
- tests: add idempotencyKey precedence over coalesce regression test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/durably/src/job.ts (1)

338-349: ⚠️ Potential issue | 🟠 Major

検証済み labels をそのまま使ってください。

ここでは validateJobInputOrThrow(labelsSchema, ...) の戻り値を捨てているので、schema の strip/default/coerce 結果が storage.enqueue*()run:coalescedskippedLabels に反映されません。labelsSchema では通る値が後段の validateLabels() で落ちたり、余計なラベルが保存されるので、parse 済み値を保持して使い回した方が安全です。

💡 修正例
-      if (labelsSchema && options?.labels) {
-        validateJobInputOrThrow(labelsSchema, options.labels, 'labels')
-      }
+      const validatedLabels =
+        labelsSchema && options?.labels
+          ? validateJobInputOrThrow(labelsSchema, options.labels, 'labels')
+          : options?.labels

       const { run, disposition } = await storage.enqueue({
         jobName: jobDef.name,
         input: validatedInput,
         idempotencyKey: options?.idempotencyKey,
         concurrencyKey: options?.concurrencyKey,
-        labels: options?.labels,
+        labels: validatedLabels,
         coalesce: options?.coalesce,
       })

       emitDispositionEvent(
         disposition,
         run,
         validatedInput,
-        options?.labels as Record<string, string>,
+        validatedLabels as Record<string, string>,
       )
@@
-        if (labelsSchema && opts?.labels) {
-          validateJobInputOrThrow(
-            labelsSchema,
-            opts.labels,
-            `labels at index ${i}`,
-          )
-        }
+        const validatedLabels =
+          labelsSchema && opts?.labels
+            ? validateJobInputOrThrow(
+                labelsSchema,
+                opts.labels,
+                `labels at index ${i}`,
+              )
+            : opts?.labels
         validated.push({
           input: validatedInput,
-          options: opts,
+          options: { ...opts, labels: validatedLabels },
         })

Also applies to: 352-357, 502-512, 517-523, 527-534

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/src/job.ts` around lines 338 - 349, The code currently calls
validateJobInputOrThrow(labelsSchema, options.labels, 'labels') and discards its
return value, so schema-driven parsing/stripping/coercion isn’t applied when
passing labels to storage.enqueue or when building run.coalesced.skippedLabels;
change the call to capture the parsed labels (e.g., const validatedLabels =
validateJobInputOrThrow(...)) and use validatedLabels wherever options.labels is
forwarded (notably in storage.enqueue and any places that set
run.coalesced.skippedLabels or call validateLabels). Update all similar spots
that validate labels (the blocks around the storage.enqueue call and the other
listed ranges) to use the parsed/returned value instead of the original
options.labels.
♻️ Duplicate comments (2)
packages/durably/src/storage.ts (2)

479-499: ⚠️ Potential issue | 🟠 Major

未分類エラーを pending 競合に畳み込まないでください。

violation === null は「pending 競合」ではなく「ここでは分類できなかった失敗」です。今の分岐だと concurrencyKey があるだけで INSERT / label 書き込み / 接続系の障害まで ConflictErrorcoalesced に化けるので、pending 競合扱いは明示的に識別できたケースだけに絞るべきです。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/src/storage.ts` around lines 479 - 499, The code currently
treats violation === null as a pending concurrency conflict which incorrectly
collapses unknown/unclassified errors into pending conflicts; in the block that
checks for pending concurrency (using parseUniqueViolation -> violation and
input.concurrencyKey), remove the "|| violation === null" condition so only
violation === 'pending_concurrency' is handled as a concurrency conflict, and
ensure the null/unknown branch falls through (or is rethrown/handled elsewhere)
instead of being converted to a ConflictError/coalesced result.

769-809: ⚠️ Potential issue | 🔴 Critical

Phase 2 の更新は再ガードしないと別ワーカーの状態を上書きします。

remaining を読んだ後の reset/fail update が id だけなので、その間に completeRun() / cancelRun() / renewLease() / 別の releaseExpiredLeases() が進むと terminal / renewed 状態を pending / failed に戻せてしまいます。ここは少なくとも status = 'leased'lease_expires_at <= now(できれば元の lease_generation も)で再ガードし、0 件更新は count に含めず、unique 以外の例外は再送出してください。

💡 修正例
-      const remaining = await db
+      const remaining = await db
         .selectFrom('durably_runs')
-        .select('id')
+        .select(['id', 'lease_generation'])
@@
-              await trx
+              const reset = await trx
                 .updateTable('durably_runs')
                 .set({
                   status: 'pending',
                   lease_owner: null,
                   lease_expires_at: null,
                   updated_at: now,
                 })
                 .where('id', '=', row.id)
-                .execute()
-              count++
-            } catch {
+                .where('status', '=', 'leased')
+                .where('lease_expires_at', 'is not', null)
+                .where('lease_expires_at', '<=', now)
+                .where('lease_generation', '=', row.lease_generation)
+                .executeTakeFirst()
+              count += Number(reset.numUpdatedRows)
+            } catch (err) {
+              if (parseUniqueViolation(err) !== 'pending_concurrency') throw err
               await sql`ROLLBACK TO SAVEPOINT sp_release`.execute(trx)
-              await trx
+              const failed = await trx
                 .updateTable('durably_runs')
                 .set({
                   status: 'failed',
                   error: 'Lease expired; pending run already exists',
                   lease_owner: null,
                   lease_expires_at: null,
                   completed_at: now,
                   updated_at: now,
                 })
                 .where('id', '=', row.id)
-                .execute()
-              count++
+                .where('status', '=', 'leased')
+                .where('lease_expires_at', 'is not', null)
+                .where('lease_expires_at', '<=', now)
+                .where('lease_generation', '=', row.lease_generation)
+                .executeTakeFirst()
+              count += Number(failed.numUpdatedRows)
             }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/durably/src/storage.ts` around lines 769 - 809, The phase-2 updates
currently only filter by id and can overwrite concurrent state changes; modify
the transactional update in the loop that sets status back to 'pending' (the
trx.updateTable('durably_runs') call inside the try after SAVEPOINT sp_release)
to re-guard the update by adding conditions: status = 'leased' AND
lease_expires_at <= now (and include lease_generation if available) so it only
updates when the lease still matches; after the update check the affected row
count and only increment count when rowsAffected > 0 (do not count 0-updates).
In the catch block (around ROLLBACK TO SAVEPOINT sp_release and the subsequent
fail update) only handle unique-violation errors specially (perform the fail
update and increment count); for any other exception rethrow it. Reference the
loop over remaining, the SAVEPOINT sp_release/ROLLBACK/RELEASE calls, the two
trx.updateTable('durably_runs') calls, and the count variable when making these
changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@website/api/define-job.md`:
- Around line 82-83:
「coalesce」オプションだけが有効に見える説明になっているので、オプション表の「coalesce?」行に制約を追記して、(1) coalesce は
concurrencyKey が指定されている場合にのみ有効であること、(2) idempotencyKey がヒットした場合の disposition は
"coalesced" ではなく "idempotent" になることを明記してください;該当する記述(options 表の coalesce
行と同様の箇所、及び disposition の説明があるブロック)を更新して、coalesce と concurrencyKey/idempotencyKey
の関係と期待される disposition の分岐を短い一文で明確に示してください。

In `@website/api/http-handler.md`:
- Around line 132-140: 説明に「coalesce」の前提条件と返却値の意味を明確化してください: coalesce が有効になるのは
concurrencyKey が指定されている場合のみで、idempotencyKey が一致する場合は coalesced ではなく disposition
に "idempotent" を返すこと、また disposition !== "created" の場合の runId は既存の run
を指すことを追記してください。さらに SSE の挙動について run:coalesced イベントが発生することと、coalesced / idempotent
のケースでは run:trigger が発火しないことを明記して、クライアントが分岐処理を誤らないようにしてください(参照: coalesce,
concurrencyKey, idempotencyKey, disposition, runId, SSE events
run:coalesced/run:trigger)。

---

Outside diff comments:
In `@packages/durably/src/job.ts`:
- Around line 338-349: The code currently calls
validateJobInputOrThrow(labelsSchema, options.labels, 'labels') and discards its
return value, so schema-driven parsing/stripping/coercion isn’t applied when
passing labels to storage.enqueue or when building run.coalesced.skippedLabels;
change the call to capture the parsed labels (e.g., const validatedLabels =
validateJobInputOrThrow(...)) and use validatedLabels wherever options.labels is
forwarded (notably in storage.enqueue and any places that set
run.coalesced.skippedLabels or call validateLabels). Update all similar spots
that validate labels (the blocks around the storage.enqueue call and the other
listed ranges) to use the parsed/returned value instead of the original
options.labels.

---

Duplicate comments:
In `@packages/durably/src/storage.ts`:
- Around line 479-499: The code currently treats violation === null as a pending
concurrency conflict which incorrectly collapses unknown/unclassified errors
into pending conflicts; in the block that checks for pending concurrency (using
parseUniqueViolation -> violation and input.concurrencyKey), remove the "||
violation === null" condition so only violation === 'pending_concurrency' is
handled as a concurrency conflict, and ensure the null/unknown branch falls
through (or is rethrown/handled elsewhere) instead of being converted to a
ConflictError/coalesced result.
- Around line 769-809: The phase-2 updates currently only filter by id and can
overwrite concurrent state changes; modify the transactional update in the loop
that sets status back to 'pending' (the trx.updateTable('durably_runs') call
inside the try after SAVEPOINT sp_release) to re-guard the update by adding
conditions: status = 'leased' AND lease_expires_at <= now (and include
lease_generation if available) so it only updates when the lease still matches;
after the update check the affected row count and only increment count when
rowsAffected > 0 (do not count 0-updates). In the catch block (around ROLLBACK
TO SAVEPOINT sp_release and the subsequent fail update) only handle
unique-violation errors specially (perform the fail update and increment count);
for any other exception rethrow it. Reference the loop over remaining, the
SAVEPOINT sp_release/ROLLBACK/RELEASE calls, the two
trx.updateTable('durably_runs') calls, and the count variable when making these
changes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 25e47986-6c0f-4368-952d-fbed622140a7

📥 Commits

Reviewing files that changed from the base of the PR and between 0389eef and 51053df.

📒 Files selected for processing (8)
  • packages/durably/src/job.ts
  • packages/durably/src/storage.ts
  • packages/durably/tests/shared/coalesce.shared.ts
  • website/api/define-job.md
  • website/api/events.md
  • website/api/http-handler.md
  • website/guide/concepts.md
  • website/guide/error-handling.md
✅ Files skipped from review due to trivial changes (3)
  • website/guide/error-handling.md
  • website/guide/concepts.md
  • website/api/events.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/durably/tests/shared/coalesce.shared.ts

Comment thread website/api/define-job.md
Comment thread website/api/http-handler.md
- job.ts: use validated labels (parseJobInputOrThrow result) instead of
  raw options.labels for storage.enqueue and event emission
- storage.ts: add comment explaining violation === null safety (after
  isUniqueViolation gate, only idempotency and pending_concurrency
  constraints exist on this table)
- define-job.md: note coalesce requires concurrencyKey, idempotency
  takes priority
- http-handler.md: document coalesce prerequisites, disposition meaning,
  SSE behavior for coalesced/idempotent triggers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@coji
Copy link
Copy Markdown
Owner Author

coji commented Mar 25, 2026

CodeRabbit Review Round 2 — All addressed in 65522dc

Outside diff: validated labels (job.ts:338-349)

Fixed. validateJobInputOrThrow return value is now captured and used for both storage.enqueue() and emitDispositionEvent(). Same fix applied to batchTrigger().

Duplicate: violation === null (storage.ts:479-499)

This is safe after the isUniqueViolation(err) gate added in 856ecb1. If we reach parseUniqueViolation, the error is confirmed to be a UNIQUE violation. violation === null means "UNIQUE violation but couldn't identify which constraint" — given this table only has two UNIQUE constraints (idempotency, checked above, and pending_concurrency), treating null as pending_concurrency is correct. Added a comment explaining this reasoning.

Duplicate: Phase 2 re-guard (storage.ts:769-809)

Already fixed in 856ecb1 — WHERE clauses include status = 'leased' and lease_expires_at <= now, and catch only handles isUniqueViolation errors.

Phase 2 count was inflated when concurrent workers already handled the
row (WHERE guards cause 0-row updates). Now checks numUpdatedRows.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reuses shared coalesce test suite with PostgreSQL dialect.
Excluded from `pnpm test` by default (*.postgres.test.ts pattern),
run with `pnpm --filter @coji/durably test:node:postgres`.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
)

- Use createPostgresSchemaResource for isolated schema per test suite
- Clean up run/step/label/log data in afterEach (PostgreSQL shares
  schema across tests, unlike SQLite temp files)
- All 28 coalesce tests pass on PostgreSQL

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: coalesce option for trigger (skip mode)

1 participant