From e17096bd3a236dbff3bccbbad50b24fd781c11e5 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 09:32:30 -0300 Subject: [PATCH 1/6] fix: scan tuple item keys as raw ranges Avoid using keyspace range decoding for queue item scans because item keys are tuple-encoded as {priority, vesting_time, id}. The keyspace decoder expects one complete tuple value and raises when a range scan returns nested tuple key suffixes. Scan raw key ranges by prefix for peek, minimum vesting time, and queue-empty checks instead. --- lib/bedrock/job_queue/store.ex | 14 +++++++++++--- test/bedrock/job_queue/store_test.exs | 25 +++++++++++++++++++++++-- test/support/store_test_helpers.ex | 14 ++++++++------ 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index 5d25df8..dc01c21 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -192,7 +192,7 @@ defmodule Bedrock.JobQueue.Store do # Uses Stream to avoid loading all items into memory # Stops early once we have enough visible items OR hit max_scan keyspaces.items - |> repo.get_range(limit: max_scan) + |> raw_keyspace_range(repo, limit: max_scan) |> Stream.map(fn {_key, value} -> decode(value) end) |> Stream.filter(&Item.visible?(&1, now)) |> Enum.take(limit) @@ -542,7 +542,7 @@ defmodule Bedrock.JobQueue.Store do # Scan all items and find minimum vesting_time # Items are sorted by {priority, vesting_time, id}, so we need to check all keyspaces.items - |> repo.get_range(limit: limit) + |> raw_keyspace_range(repo, limit: limit) |> Enum.reduce(nil, fn {_key, value}, acc -> item = decode(value) @@ -687,7 +687,7 @@ defmodule Bedrock.JobQueue.Store do defp queue_empty?(repo, root, queue_id) do keyspaces = queue_keyspaces(root, queue_id) - repo.get_range(keyspaces.items, limit: 1) == [] + raw_keyspace_range(keyspaces.items, repo, limit: 1) == [] end # Private helpers @@ -720,6 +720,14 @@ defmodule Bedrock.JobQueue.Store do defp decode_timestamp(nil), do: 0 defp decode_timestamp(<>), do: time + defp raw_keyspace_range(keyspace, repo, opts) do + prefix = Keyspace.prefix(keyspace) + + prefix + |> Bedrock.KeyRange.from_prefix() + |> repo.get_range(opts) + end + # Atomically updates pending and processing stats defp update_stats(repo, keyspaces, pending_delta, processing_delta) do if pending_delta != 0 do diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index 3222c0b..8974923 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -81,6 +81,27 @@ defmodule Bedrock.JobQueue.StoreTest do end describe "peek/4 priority ordering" do + test "scans raw key ranges for tuple-encoded item keys" do + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{}, priority: 100, vesting_time: 1000) + encoded_item = :erlang.term_to_binary(item) + packed_item_key = Keyspace.pack(keyspaces.items, Item.key(item)) + + expect(MockRepo, :get_range, fn + %Keyspace{}, _opts -> + flunk("tuple-encoded item keyspaces must be scanned as raw ranges") + + {start_key, end_key}, _opts when is_binary(start_key) and is_binary(end_key) -> + assert packed_item_key >= start_key + assert packed_item_key < end_key + [{packed_item_key, encoded_item}] + end) + + assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, limit: 10, now: 2000) + assert item_id == item.id + end + test "returns items in priority order (lowest number first)" do # Create items with different priorities high_priority = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1000) @@ -95,7 +116,7 @@ defmodule Bedrock.JobQueue.StoreTest do ] # Mock returns items in arbitrary order - peek should sort by key - expect(MockRepo, :get_range, fn %Keyspace{}, _opts -> + expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts -> # Return sorted by key (simulating DB behavior) Enum.sort_by(items, fn {key, _} -> key end) end) @@ -119,7 +140,7 @@ defmodule Bedrock.JobQueue.StoreTest do {Item.key(earlier), :erlang.term_to_binary(earlier)} ] - expect(MockRepo, :get_range, fn %Keyspace{}, _opts -> + expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts -> Enum.sort_by(items, fn {key, _} -> key end) end) diff --git a/test/support/store_test_helpers.ex b/test/support/store_test_helpers.ex index 97d45cf..655af5d 100644 --- a/test/support/store_test_helpers.ex +++ b/test/support/store_test_helpers.ex @@ -123,17 +123,19 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do # ============================================================================ @doc """ - Expects a get_range on the items keyspace for a specific queue_id. - Verifies the keyspace contains items path and returns the given items. + Expects a raw get_range over the items keyspace for a specific queue_id. + Verifies the key range contains the queue items path and returns the given items. Items should be a list of `{key, encoded_value}` tuples. """ def expect_peek(repo, queue_id, items) do - expect(repo, :get_range, fn %Keyspace{} = ks, opts -> - prefix = Keyspace.prefix(ks) + root = Keyspace.new("job_queue/") + keyspaces = Bedrock.JobQueue.Store.queue_keyspaces(root, queue_id) + prefix = Keyspace.prefix(keyspaces.items) - assert String.contains?(prefix, "queues/#{queue_id}/items/"), - "Expected items keyspace for queue #{queue_id}, got: #{prefix}" + expect(repo, :get_range, fn {start_key, end_key}, opts -> + assert start_key == prefix + assert end_key > prefix assert is_list(opts), "Expected opts to be a list" items From 4c5df7153129388fc37a2868a60a521b7528e979 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 09:40:52 -0300 Subject: [PATCH 2/6] fix: satisfy store helper credo --- test/support/store_test_helpers.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/support/store_test_helpers.ex b/test/support/store_test_helpers.ex index 655af5d..82fbbdb 100644 --- a/test/support/store_test_helpers.ex +++ b/test/support/store_test_helpers.ex @@ -28,6 +28,7 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do alias Bedrock.JobQueue.Item alias Bedrock.JobQueue.Lease alias Bedrock.JobQueue.QueueLease + alias Bedrock.JobQueue.Store alias Bedrock.Keyspace # ============================================================================ @@ -130,7 +131,7 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do """ def expect_peek(repo, queue_id, items) do root = Keyspace.new("job_queue/") - keyspaces = Bedrock.JobQueue.Store.queue_keyspaces(root, queue_id) + keyspaces = Store.queue_keyspaces(root, queue_id) prefix = Keyspace.prefix(keyspaces.items) expect(repo, :get_range, fn {start_key, end_key}, opts -> From 1c367f1911223ffdf47650f9499a33d8cd68501c Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 10:24:17 -0300 Subject: [PATCH 3/6] fix: use stored lease keys for terminal actions --- lib/bedrock/job_queue/store.ex | 7 +-- test/bedrock/job_queue/store_test.exs | 62 +++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index dc01c21..7b8f096 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -387,8 +387,7 @@ defmodule Bedrock.JobQueue.Store do case verify_lease(repo, keyspaces, lease) do {:ok, stored_lease} -> - # Use provided item_key or fall back to stored lease's item_key - item_key = lease.item_key || stored_lease.item_key + item_key = stored_lease.item_key repo.clear(keyspaces.items, item_key) repo.clear(keyspaces.leases, lease.item_id) @@ -430,16 +429,12 @@ defmodule Bedrock.JobQueue.Store do pointers = pointer_keyspace(root) now = Keyword.get(opts, :now) || System.system_time(:millisecond) - # Get item_key from lease or fetch from stored lease with {:ok, item_key} <- resolve_item_key(repo, keyspaces, lease), {:ok, item} <- fetch_item(repo, keyspaces, item_key) do do_requeue(repo, keyspaces, pointers, lease, item, item_key, opts, now) end end - defp resolve_item_key(_repo, _keyspaces, %Lease{item_key: item_key}) when item_key != nil, - do: {:ok, item_key} - defp resolve_item_key(repo, keyspaces, %Lease{} = lease) do case verify_lease(repo, keyspaces, lease) do {:ok, stored_lease} -> {:ok, stored_lease.item_key} diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index 8974923..6390e59 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -416,6 +416,35 @@ defmodule Bedrock.JobQueue.StoreTest do end end + describe "complete/3" do + test "uses the stored lease item key when the caller lease is stale" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: now) + stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + extended_expires_at = stale_lease.expires_at + 5_000 + current_item_key = {item.priority, extended_expires_at, item.id} + stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key} + + current_item = %{ + item + | lease_id: stale_lease.id, + lease_expires_at: extended_expires_at, + vesting_time: extended_expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, current_item) + MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease)) + + assert :ok = Store.complete(MockRepo, root(), stale_lease) + assert MockRepo.get(keyspaces.items, current_item_key) == nil + assert MockRepo.get(keyspaces.leases, stale_lease.item_id) == nil + end + end + describe "requeue/4" do test "uses base_delay for the first retry visibility time" do now = 10_000 @@ -442,6 +471,39 @@ defmodule Bedrock.JobQueue.StoreTest do assert item_id == item.id end + + test "uses the stored lease item key when the caller lease is stale" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now) + stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + extended_expires_at = stale_lease.expires_at + 5_000 + current_item_key = {item.priority, extended_expires_at, item.id} + stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key} + + current_item = %{ + item + | lease_id: stale_lease.id, + lease_expires_at: extended_expires_at, + vesting_time: extended_expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, current_item) + MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease)) + + assert {:ok, :requeued} = + Store.requeue(MockRepo, root(), stale_lease, now: now, base_delay: 1_000) + + assert MockRepo.get(keyspaces.items, current_item_key) == nil + + assert [%Item{id: item_id, error_count: 1, lease_id: nil}] = + Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + + assert item_id == item.id + end end describe "gc_stale_pointers/3" do From 035512706ab9c773286e78347dea8ae678d69629 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 10:25:50 -0300 Subject: [PATCH 4/6] test: store lease before requeue --- test/bedrock/job_queue/store_test.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index 6390e59..3e4c9af 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -463,6 +463,7 @@ defmodule Bedrock.JobQueue.StoreTest do {:ok, store} = start_mock_store() setup_integration_stubs(MockRepo, store) store_item(store, keyspaces.items, leased_item) + MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease)) assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000) assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999) From afa8e3d35961d1bd704f449baddbda6757905b02 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 11:01:08 -0300 Subject: [PATCH 5/6] fix: keep dead letters out of item scans Store dead-lettered jobs in a sibling queue keyspace and filter item scans by valid item key shape so legacy non-item rows cannot crash or block queue scans. Make lease-extender log capture deterministic in CI. --- lib/bedrock/job_queue/store.ex | 37 ++++++-- .../consumer/lease_extender_test.exs | 8 +- test/bedrock/job_queue/store_test.exs | 85 +++++++++++++++++-- test/support/store_test_helpers.ex | 6 +- 4 files changed, 118 insertions(+), 18 deletions(-) diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index 7b8f096..3452d4f 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -44,6 +44,7 @@ defmodule Bedrock.JobQueue.Store do Returns a map with keyspaces for items, leases, and stats. """ @spec queue_keyspaces(root_keyspace(), String.t()) :: %{ + dead_letter: Keyspace.t(), items: Keyspace.t(), leases: Keyspace.t(), stats: Keyspace.t() @@ -52,6 +53,7 @@ defmodule Bedrock.JobQueue.Store do queue_ks = Keyspace.partition(root, "queues/#{queue_id}/") %{ + dead_letter: Keyspace.partition(queue_ks, "dead_letter/"), items: Keyspace.partition(queue_ks, "items/", key_encoding: TupleEncoding), leases: Keyspace.partition(queue_ks, "leases/"), stats: Keyspace.partition(queue_ks, "stats/") @@ -192,7 +194,7 @@ defmodule Bedrock.JobQueue.Store do # Uses Stream to avoid loading all items into memory # Stops early once we have enough visible items OR hit max_scan keyspaces.items - |> raw_keyspace_range(repo, limit: max_scan) + |> item_keyspace_range(repo, limit: max_scan) |> Stream.map(fn {_key, value} -> decode(value) end) |> Stream.filter(&Item.visible?(&1, now)) |> Enum.take(limit) @@ -537,7 +539,7 @@ defmodule Bedrock.JobQueue.Store do # Scan all items and find minimum vesting_time # Items are sorted by {priority, vesting_time, id}, so we need to check all keyspaces.items - |> raw_keyspace_range(repo, limit: limit) + |> item_keyspace_range(repo, limit: limit) |> Enum.reduce(nil, fn {_key, value}, acc -> item = decode(value) @@ -682,7 +684,7 @@ defmodule Bedrock.JobQueue.Store do defp queue_empty?(repo, root, queue_id) do keyspaces = queue_keyspaces(root, queue_id) - raw_keyspace_range(keyspaces.items, repo, limit: 1) == [] + Enum.empty?(item_keyspace_range(keyspaces.items, repo, limit: 1)) end # Private helpers @@ -715,12 +717,35 @@ defmodule Bedrock.JobQueue.Store do defp decode_timestamp(nil), do: 0 defp decode_timestamp(<>), do: time - defp raw_keyspace_range(keyspace, repo, opts) do + defp item_keyspace_range(keyspace, repo, opts) do prefix = Keyspace.prefix(keyspace) prefix |> Bedrock.KeyRange.from_prefix() |> repo.get_range(opts) + |> Stream.filter(fn {key, _value} -> item_storage_key?(key, prefix) end) + end + + defp item_storage_key?(key, prefix) do + prefix_len = byte_size(prefix) + + case key do + <<^prefix::binary-size(prefix_len), suffix::binary>> -> item_key_suffix?(suffix) + _ -> false + end + end + + defp item_key_suffix?(suffix) do + case TupleEncoding.unpack(suffix) do + {priority, vesting_time, id} + when is_integer(priority) and is_integer(vesting_time) and is_binary(id) -> + true + + _ -> + false + end + rescue + ArgumentError -> false end # Atomically updates pending and processing stats @@ -752,11 +777,9 @@ defmodule Bedrock.JobQueue.Store do defp decode_counter(_), do: 0 defp move_to_dead_letter(repo, keyspaces, item_key, item, now) do - dead_letter_ks = Keyspace.partition(keyspaces.items, "../dead_letter/") - # Write to dead letter with failed_at timestamp dl_key = "#{now}/#{item.id}" - repo.put(dead_letter_ks, dl_key, encode(item)) + repo.put(keyspaces.dead_letter, dl_key, encode(item)) # Delete from main queue and update stats repo.clear(keyspaces.items, item_key) diff --git a/test/bedrock/job_queue/consumer/lease_extender_test.exs b/test/bedrock/job_queue/consumer/lease_extender_test.exs index 1374035..eb33588 100644 --- a/test/bedrock/job_queue/consumer/lease_extender_test.exs +++ b/test/bedrock/job_queue/consumer/lease_extender_test.exs @@ -137,6 +137,8 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do capture_log(fn -> pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) assert_receive :done, 100 + Process.sleep(10) + Logger.flush() LeaseExtender.stop(pid) end) @@ -162,6 +164,8 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do capture_log(fn -> pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) assert_receive :done, 100 + Process.sleep(10) + Logger.flush() LeaseExtender.stop(pid) end) @@ -189,9 +193,9 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do expires_at: System.system_time(:millisecond) + 30_000 }, 30_000, interval: 10) assert_receive :done, 100 - LeaseExtender.stop(pid) - # Allow time for log to be captured Process.sleep(10) + Logger.flush() + LeaseExtender.stop(pid) end) assert log =~ "Transaction failed extending lease" diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index 3e4c9af..757965d 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -29,15 +29,23 @@ defmodule Bedrock.JobQueue.StoreTest do test "creates keyspaces with proper structure" do keyspaces = Store.queue_keyspaces(root(), "tenant_1") - assert %{items: items, leases: leases, stats: stats} = keyspaces + assert %{ + dead_letter: dead_letter, + items: items, + leases: leases, + stats: stats + } = keyspaces + assert dead_letter.key_encoding == nil assert items.key_encoding == TupleEncoding assert leases.key_encoding == nil assert stats.key_encoding == nil # Verify prefix contains expected path components + assert String.contains?(Keyspace.prefix(dead_letter), "dead_letter/") assert String.contains?(Keyspace.prefix(items), "items/") assert String.contains?(Keyspace.prefix(leases), "leases/") assert String.contains?(Keyspace.prefix(stats), "stats/") + refute String.starts_with?(Keyspace.prefix(dead_letter), Keyspace.prefix(items)) end end @@ -81,12 +89,16 @@ defmodule Bedrock.JobQueue.StoreTest do end describe "peek/4 priority ordering" do - test "scans raw key ranges for tuple-encoded item keys" do + test "ignores non-item rows under the item scan prefix" do queue_id = "tenant_1" keyspaces = Store.queue_keyspaces(root(), queue_id) item = Item.new(queue_id, "topic", %{}, priority: 100, vesting_time: 1000) encoded_item = :erlang.term_to_binary(item) packed_item_key = Keyspace.pack(keyspaces.items, Item.key(item)) + legacy_dead_letter_key = + Keyspace.prefix(keyspaces.items) <> + TupleEncoding.pack("../dead_letter/") <> TupleEncoding.pack("1000/#{item.id}") + legacy_dead_letter_item = :erlang.term_to_binary(%{item | id: "dead-lettered"}) expect(MockRepo, :get_range, fn %Keyspace{}, _opts -> @@ -95,7 +107,11 @@ defmodule Bedrock.JobQueue.StoreTest do {start_key, end_key}, _opts when is_binary(start_key) and is_binary(end_key) -> assert packed_item_key >= start_key assert packed_item_key < end_key - [{packed_item_key, encoded_item}] + + [ + {legacy_dead_letter_key, legacy_dead_letter_item}, + {packed_item_key, encoded_item} + ] end) assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, limit: 10, now: 2000) @@ -107,12 +123,22 @@ defmodule Bedrock.JobQueue.StoreTest do high_priority = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1000) medium_priority = Item.new("tenant_1", "topic", %{}, priority: 50, vesting_time: 1000) low_priority = Item.new("tenant_1", "topic", %{}, priority: 200, vesting_time: 1000) + keyspaces = Store.queue_keyspaces(root(), "tenant_1") # Encode items items = [ - {Item.key(low_priority), :erlang.term_to_binary(low_priority)}, - {Item.key(high_priority), :erlang.term_to_binary(high_priority)}, - {Item.key(medium_priority), :erlang.term_to_binary(medium_priority)} + { + Keyspace.pack(keyspaces.items, Item.key(low_priority)), + :erlang.term_to_binary(low_priority) + }, + { + Keyspace.pack(keyspaces.items, Item.key(high_priority)), + :erlang.term_to_binary(high_priority) + }, + { + Keyspace.pack(keyspaces.items, Item.key(medium_priority)), + :erlang.term_to_binary(medium_priority) + } ] # Mock returns items in arbitrary order - peek should sort by key @@ -134,10 +160,17 @@ defmodule Bedrock.JobQueue.StoreTest do Item.new("tenant_1", "topic", %{data: "earlier"}, priority: 100, vesting_time: 1000) later = Item.new("tenant_1", "topic", %{data: "later"}, priority: 100, vesting_time: 2000) + keyspaces = Store.queue_keyspaces(root(), "tenant_1") items = [ - {Item.key(later), :erlang.term_to_binary(later)}, - {Item.key(earlier), :erlang.term_to_binary(earlier)} + { + Keyspace.pack(keyspaces.items, Item.key(later)), + :erlang.term_to_binary(later) + }, + { + Keyspace.pack(keyspaces.items, Item.key(earlier)), + :erlang.term_to_binary(earlier) + } ] expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts -> @@ -505,6 +538,42 @@ defmodule Bedrock.JobQueue.StoreTest do assert item_id == item.id end + + test "dead letters max-retry items outside the item scan prefix" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 1, vesting_time: now) + lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + + leased_item = %{ + item + | lease_id: lease.id, + lease_expires_at: lease.expires_at, + vesting_time: lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease)) + + assert {:ok, :dead_lettered} = Store.requeue(MockRepo, root(), lease, now: now) + assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + assert MockRepo.get(keyspaces.items, Item.key(leased_item)) == nil + assert MockRepo.get(keyspaces.leases, lease.item_id) == nil + + dead_letter_entries = + Agent.get(store, fn state -> + Enum.filter(state, fn {{prefix, _key}, _value} -> + prefix == Keyspace.prefix(keyspaces.dead_letter) + end) + end) + + assert [{_storage_key, encoded_item}] = dead_letter_entries + assert %Item{id: item_id} = :erlang.binary_to_term(encoded_item) + assert item_id == item.id + end end describe "gc_stale_pointers/3" do diff --git a/test/support/store_test_helpers.ex b/test/support/store_test_helpers.ex index 82fbbdb..f703ea7 100644 --- a/test/support/store_test_helpers.ex +++ b/test/support/store_test_helpers.ex @@ -25,6 +25,7 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do import ExUnit.Assertions import Mox + alias Bedrock.Encoding.Tuple, as: TupleEncoding alias Bedrock.JobQueue.Item alias Bedrock.JobQueue.Lease alias Bedrock.JobQueue.QueueLease @@ -350,7 +351,10 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do defp key_in_range?(_, _, _), do: false - defp extract_key_value({{prefix, _k}, v}), do: {prefix, v} + defp extract_key_value({{prefix, key}, v}) when is_tuple(key), + do: {prefix <> TupleEncoding.pack(key), v} + + defp extract_key_value({{prefix, key}, v}) when is_binary(key), do: {prefix <> key, v} defp extract_key_value({k, v}), do: {k, v} @doc """ From a6bb2e710050f21431f9b6513a0067f8089b0040 Mon Sep 17 00:00:00 2001 From: Cristiano Carvalho Date: Wed, 3 Jun 2026 11:08:32 -0300 Subject: [PATCH 6/6] test: avoid lease extender log race Keep the log-capture tests from waiting through a second extender interval while still flushing Logger inside the capture window. --- test/bedrock/job_queue/consumer/lease_extender_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/bedrock/job_queue/consumer/lease_extender_test.exs b/test/bedrock/job_queue/consumer/lease_extender_test.exs index eb33588..2947aa7 100644 --- a/test/bedrock/job_queue/consumer/lease_extender_test.exs +++ b/test/bedrock/job_queue/consumer/lease_extender_test.exs @@ -135,7 +135,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do log = capture_log(fn -> - pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) + pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50) assert_receive :done, 100 Process.sleep(10) Logger.flush() @@ -162,7 +162,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do log = capture_log(fn -> - pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) + pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50) assert_receive :done, 100 Process.sleep(10) Logger.flush() @@ -191,7 +191,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do holder: @holder_id, obtained_at: System.system_time(:millisecond), expires_at: System.system_time(:millisecond) + 30_000 - }, 30_000, interval: 10) + }, 30_000, interval: 50) assert_receive :done, 100 Process.sleep(10) Logger.flush()