diff --git a/lib/bedrock/job_queue/item.ex b/lib/bedrock/job_queue/item.ex index d5db0c0..f959a37 100644 --- a/lib/bedrock/job_queue/item.ex +++ b/lib/bedrock/job_queue/item.ex @@ -83,7 +83,11 @@ defmodule Bedrock.JobQueue.Item do end @doc """ - Returns true if the job is currently visible (vesting_time has passed and not leased). + Returns true if the job is currently visible. + + An item is visible when its vesting time has passed and it is not actively + leased. Expired leases are considered visible so another worker can reclaim + stale work. """ @spec visible?(t()) :: boolean() @spec visible?(t(), non_neg_integer()) :: boolean() @@ -91,6 +95,11 @@ defmodule Bedrock.JobQueue.Item do def visible?(%__MODULE__{vesting_time: vt, lease_id: nil}, now), do: now >= vt + def visible?(%__MODULE__{vesting_time: vt, lease_expires_at: exp}, now) + when not is_nil(exp) do + now >= vt and now >= exp + end + def visible?(%__MODULE__{}, _now), do: false @doc """ diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index cb7b7bd..5d25df8 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -271,38 +271,39 @@ defmodule Bedrock.JobQueue.Store do value -> current_item = decode(value) - if current_item.lease_id == nil do - # Create lease - lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) - lease_expires_at = now + duration_ms + if Item.leased?(current_item, now: now) do + {:error, :already_leased} + else + do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now) + end + end + end - # Update item with lease info and new vesting_time - updated_item = %{ - current_item - | lease_id: lease.id, - lease_expires_at: lease_expires_at, - vesting_time: lease_expires_at - } + defp do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now) do + lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) + lease_expires_at = now + duration_ms + pending_item? = current_item.lease_id == nil - # Delete old item key (vesting_time changed) - repo.clear(keyspaces.items, item_key) + updated_item = %{ + current_item + | lease_id: lease.id, + lease_expires_at: lease_expires_at, + vesting_time: lease_expires_at + } - # Write with new key (new vesting_time) - new_item_key = Item.key(updated_item) - repo.put(keyspaces.items, new_item_key, encode(updated_item)) + repo.clear(keyspaces.items, Item.key(current_item)) - # Write lease record - repo.put(keyspaces.leases, lease.item_id, encode(lease)) + new_item_key = Item.key(updated_item) + repo.put(keyspaces.items, new_item_key, encode(updated_item)) + repo.put(keyspaces.leases, lease.item_id, encode(lease)) - # Update pointer and stats - update_pointer(repo, pointers, lease_expires_at, item.queue_id, now) - update_stats(repo, keyspaces, -1, 1) + update_pointer(repo, pointers, lease_expires_at, current_item.queue_id, now) - {:ok, lease} - else - {:error, :already_leased} - end + if pending_item? do + update_stats(repo, keyspaces, -1, 1) end + + {:ok, lease} end @doc """ @@ -497,13 +498,13 @@ defmodule Bedrock.JobQueue.Store do # Explicit base_delay overrides backoff_fn (used by snooze) base_delay = Keyword.get(opts, :base_delay) -> max_delay = Keyword.get(opts, :max_delay, 60_000) - (base_delay * :math.pow(2, error_count)) |> trunc() |> min(max_delay) + (base_delay * :math.pow(2, error_count - 1)) |> trunc() |> min(max_delay) backoff_fn = Keyword.get(opts, :backoff_fn) -> backoff_fn.(error_count) true -> - (1000 * :math.pow(2, error_count)) |> trunc() |> min(60_000) + (1000 * :math.pow(2, error_count - 1)) |> trunc() |> min(60_000) end end diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index dcac988..3222c0b 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -152,6 +152,18 @@ defmodule Bedrock.JobQueue.StoreTest do refute Item.visible?(item, now) end + + test "items with expired lease are visible again" do + now = 10_000 + + item = %{ + Item.new("queue", "topic", %{}, vesting_time: 9_000) + | lease_id: <<1, 2, 3>>, + lease_expires_at: 9_000 + } + + assert Item.visible?(item, now) + end end describe "Lease creation" do @@ -353,6 +365,62 @@ defmodule Bedrock.JobQueue.StoreTest do assert {:ok, []} = result end + + test "reclaims items whose lease expired" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: 8_000) + expired_lease = Lease.new(item, "old_holder", duration_ms: 1_000, now: 8_000) + + leased_item = %{ + item + | lease_id: expired_lease.id, + lease_expires_at: expired_lease.expires_at, + vesting_time: expired_lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + + assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, now: now) + assert item_id == item.id + + assert {:ok, [%Lease{holder: "new_holder"}]} = + Store.dequeue(MockRepo, root(), queue_id, "new_holder", + now: now, + lease_duration: 5_000 + ) + end + end + + describe "requeue/4" do + test "uses base_delay for the first retry visibility time" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now) + lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + + leased_item = %{ + item + | lease_id: lease.id, + lease_expires_at: lease.expires_at, + vesting_time: lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + + assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000) + assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999) + assert [%Item{id: item_id, error_count: 1}] = + Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + + assert item_id == item.id + end end describe "gc_stale_pointers/3" do