From 72b5aae903d895775977f869270c96da57d490a0 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Thu, 21 May 2026 22:21:38 -0300 Subject: [PATCH 1/2] fix: reclaim expired item leases Allow expired item leases to become visible and be claimed by another worker. Also align retry backoff so the first retry uses the configured base delay instead of doubling it. --- lib/bedrock/job_queue/item.ex | 11 ++++- lib/bedrock/job_queue/store.ex | 12 +++-- test/bedrock/job_queue/store_test.exs | 68 +++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 5 deletions(-) diff --git a/lib/bedrock/job_queue/item.ex b/lib/bedrock/job_queue/item.ex index d5db0c0..f959a37 100644 --- a/lib/bedrock/job_queue/item.ex +++ b/lib/bedrock/job_queue/item.ex @@ -83,7 +83,11 @@ defmodule Bedrock.JobQueue.Item do end @doc """ - Returns true if the job is currently visible (vesting_time has passed and not leased). + Returns true if the job is currently visible. + + An item is visible when its vesting time has passed and it is not actively + leased. Expired leases are considered visible so another worker can reclaim + stale work. """ @spec visible?(t()) :: boolean() @spec visible?(t(), non_neg_integer()) :: boolean() @@ -91,6 +95,11 @@ defmodule Bedrock.JobQueue.Item do def visible?(%__MODULE__{vesting_time: vt, lease_id: nil}, now), do: now >= vt + def visible?(%__MODULE__{vesting_time: vt, lease_expires_at: exp}, now) + when not is_nil(exp) do + now >= vt and now >= exp + end + def visible?(%__MODULE__{}, _now), do: false @doc """ diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index cb7b7bd..0bc324e 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -271,10 +271,11 @@ defmodule Bedrock.JobQueue.Store do value -> current_item = decode(value) - if current_item.lease_id == nil do + if not Item.leased?(current_item, now: now) do # Create lease lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) lease_expires_at = now + duration_ms + pending_item? = current_item.lease_id == nil # Update item with lease info and new vesting_time updated_item = %{ @@ -296,7 +297,10 @@ defmodule Bedrock.JobQueue.Store do # Update pointer and stats update_pointer(repo, pointers, lease_expires_at, item.queue_id, now) - update_stats(repo, keyspaces, -1, 1) + + if pending_item? do + update_stats(repo, keyspaces, -1, 1) + end {:ok, lease} else @@ -497,13 +501,13 @@ defmodule Bedrock.JobQueue.Store do # Explicit base_delay overrides backoff_fn (used by snooze) base_delay = Keyword.get(opts, :base_delay) -> max_delay = Keyword.get(opts, :max_delay, 60_000) - (base_delay * :math.pow(2, error_count)) |> trunc() |> min(max_delay) + (base_delay * :math.pow(2, error_count - 1)) |> trunc() |> min(max_delay) backoff_fn = Keyword.get(opts, :backoff_fn) -> backoff_fn.(error_count) true -> - (1000 * :math.pow(2, error_count)) |> trunc() |> min(60_000) + (1000 * :math.pow(2, error_count - 1)) |> trunc() |> min(60_000) end end diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index dcac988..3222c0b 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -152,6 +152,18 @@ defmodule Bedrock.JobQueue.StoreTest do refute Item.visible?(item, now) end + + test "items with expired lease are visible again" do + now = 10_000 + + item = %{ + Item.new("queue", "topic", %{}, vesting_time: 9_000) + | lease_id: <<1, 2, 3>>, + lease_expires_at: 9_000 + } + + assert Item.visible?(item, now) + end end describe "Lease creation" do @@ -353,6 +365,62 @@ defmodule Bedrock.JobQueue.StoreTest do assert {:ok, []} = result end + + test "reclaims items whose lease expired" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: 8_000) + expired_lease = Lease.new(item, "old_holder", duration_ms: 1_000, now: 8_000) + + leased_item = %{ + item + | lease_id: expired_lease.id, + lease_expires_at: expired_lease.expires_at, + vesting_time: expired_lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + + assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, now: now) + assert item_id == item.id + + assert {:ok, [%Lease{holder: "new_holder"}]} = + Store.dequeue(MockRepo, root(), queue_id, "new_holder", + now: now, + lease_duration: 5_000 + ) + end + end + + describe "requeue/4" do + test "uses base_delay for the first retry visibility time" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now) + lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + + leased_item = %{ + item + | lease_id: lease.id, + lease_expires_at: lease.expires_at, + vesting_time: lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + + assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000) + assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999) + assert [%Item{id: item_id, error_count: 1}] = + Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + + assert item_id == item.id + end end describe "gc_stale_pointers/3" do From a9712550ab5d72ae66895cde75df4ec591763bfd Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Thu, 21 May 2026 22:27:57 -0300 Subject: [PATCH 2/2] refactor: simplify lease acquisition flow Extract the lease write path so expired lease reclamation keeps the same behavior while satisfying the strict Credo checks used by CI. --- lib/bedrock/job_queue/store.ex | 55 ++++++++++++++++------------------ 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index 0bc324e..5d25df8 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -271,42 +271,39 @@ defmodule Bedrock.JobQueue.Store do value -> current_item = decode(value) - if not Item.leased?(current_item, now: now) do - # Create lease - lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) - lease_expires_at = now + duration_ms - pending_item? = current_item.lease_id == nil - - # Update item with lease info and new vesting_time - updated_item = %{ - current_item - | lease_id: lease.id, - lease_expires_at: lease_expires_at, - vesting_time: lease_expires_at - } + if Item.leased?(current_item, now: now) do + {:error, :already_leased} + else + do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now) + end + end + end - # Delete old item key (vesting_time changed) - repo.clear(keyspaces.items, item_key) + defp do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now) do + lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) + lease_expires_at = now + duration_ms + pending_item? = current_item.lease_id == nil - # Write with new key (new vesting_time) - new_item_key = Item.key(updated_item) - repo.put(keyspaces.items, new_item_key, encode(updated_item)) + updated_item = %{ + current_item + | lease_id: lease.id, + lease_expires_at: lease_expires_at, + vesting_time: lease_expires_at + } - # Write lease record - repo.put(keyspaces.leases, lease.item_id, encode(lease)) + repo.clear(keyspaces.items, Item.key(current_item)) - # Update pointer and stats - update_pointer(repo, pointers, lease_expires_at, item.queue_id, now) + new_item_key = Item.key(updated_item) + repo.put(keyspaces.items, new_item_key, encode(updated_item)) + repo.put(keyspaces.leases, lease.item_id, encode(lease)) - if pending_item? do - update_stats(repo, keyspaces, -1, 1) - end + update_pointer(repo, pointers, lease_expires_at, current_item.queue_id, now) - {:ok, lease} - else - {:error, :already_leased} - end + if pending_item? do + update_stats(repo, keyspaces, -1, 1) end + + {:ok, lease} end @doc """