diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index 5d25df8..3452d4f 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -44,6 +44,7 @@ defmodule Bedrock.JobQueue.Store do Returns a map with keyspaces for items, leases, and stats. """ @spec queue_keyspaces(root_keyspace(), String.t()) :: %{ + dead_letter: Keyspace.t(), items: Keyspace.t(), leases: Keyspace.t(), stats: Keyspace.t() @@ -52,6 +53,7 @@ defmodule Bedrock.JobQueue.Store do queue_ks = Keyspace.partition(root, "queues/#{queue_id}/") %{ + dead_letter: Keyspace.partition(queue_ks, "dead_letter/"), items: Keyspace.partition(queue_ks, "items/", key_encoding: TupleEncoding), leases: Keyspace.partition(queue_ks, "leases/"), stats: Keyspace.partition(queue_ks, "stats/") @@ -192,7 +194,7 @@ defmodule Bedrock.JobQueue.Store do # Uses Stream to avoid loading all items into memory # Stops early once we have enough visible items OR hit max_scan keyspaces.items - |> repo.get_range(limit: max_scan) + |> item_keyspace_range(repo, limit: max_scan) |> Stream.map(fn {_key, value} -> decode(value) end) |> Stream.filter(&Item.visible?(&1, now)) |> Enum.take(limit) @@ -387,8 +389,7 @@ defmodule Bedrock.JobQueue.Store do case verify_lease(repo, keyspaces, lease) do {:ok, stored_lease} -> - # Use provided item_key or fall back to stored lease's item_key - item_key = lease.item_key || stored_lease.item_key + item_key = stored_lease.item_key repo.clear(keyspaces.items, item_key) repo.clear(keyspaces.leases, lease.item_id) @@ -430,16 +431,12 @@ defmodule Bedrock.JobQueue.Store do pointers = pointer_keyspace(root) now = Keyword.get(opts, :now) || System.system_time(:millisecond) - # Get item_key from lease or fetch from stored lease with {:ok, item_key} <- resolve_item_key(repo, keyspaces, lease), {:ok, item} <- fetch_item(repo, keyspaces, item_key) do do_requeue(repo, keyspaces, pointers, lease, item, item_key, opts, now) end end - defp resolve_item_key(_repo, _keyspaces, %Lease{item_key: item_key}) when item_key != nil, - do: {:ok, item_key} - defp resolve_item_key(repo, keyspaces, %Lease{} = lease) do case verify_lease(repo, keyspaces, lease) do {:ok, stored_lease} -> {:ok, stored_lease.item_key} @@ -542,7 +539,7 @@ defmodule Bedrock.JobQueue.Store do # Scan all items and find minimum vesting_time # Items are sorted by {priority, vesting_time, id}, so we need to check all keyspaces.items - |> repo.get_range(limit: limit) + |> item_keyspace_range(repo, limit: limit) |> Enum.reduce(nil, fn {_key, value}, acc -> item = decode(value) @@ -687,7 +684,7 @@ defmodule Bedrock.JobQueue.Store do defp queue_empty?(repo, root, queue_id) do keyspaces = queue_keyspaces(root, queue_id) - repo.get_range(keyspaces.items, limit: 1) == [] + Enum.empty?(item_keyspace_range(keyspaces.items, repo, limit: 1)) end # Private helpers @@ -720,6 +717,37 @@ defmodule Bedrock.JobQueue.Store do defp decode_timestamp(nil), do: 0 defp decode_timestamp(<>), do: time + defp item_keyspace_range(keyspace, repo, opts) do + prefix = Keyspace.prefix(keyspace) + + prefix + |> Bedrock.KeyRange.from_prefix() + |> repo.get_range(opts) + |> Stream.filter(fn {key, _value} -> item_storage_key?(key, prefix) end) + end + + defp item_storage_key?(key, prefix) do + prefix_len = byte_size(prefix) + + case key do + <<^prefix::binary-size(prefix_len), suffix::binary>> -> item_key_suffix?(suffix) + _ -> false + end + end + + defp item_key_suffix?(suffix) do + case TupleEncoding.unpack(suffix) do + {priority, vesting_time, id} + when is_integer(priority) and is_integer(vesting_time) and is_binary(id) -> + true + + _ -> + false + end + rescue + ArgumentError -> false + end + # Atomically updates pending and processing stats defp update_stats(repo, keyspaces, pending_delta, processing_delta) do if pending_delta != 0 do @@ -749,11 +777,9 @@ defmodule Bedrock.JobQueue.Store do defp decode_counter(_), do: 0 defp move_to_dead_letter(repo, keyspaces, item_key, item, now) do - dead_letter_ks = Keyspace.partition(keyspaces.items, "../dead_letter/") - # Write to dead letter with failed_at timestamp dl_key = "#{now}/#{item.id}" - repo.put(dead_letter_ks, dl_key, encode(item)) + repo.put(keyspaces.dead_letter, dl_key, encode(item)) # Delete from main queue and update stats repo.clear(keyspaces.items, item_key) diff --git a/test/bedrock/job_queue/consumer/lease_extender_test.exs b/test/bedrock/job_queue/consumer/lease_extender_test.exs index 1374035..2947aa7 100644 --- a/test/bedrock/job_queue/consumer/lease_extender_test.exs +++ b/test/bedrock/job_queue/consumer/lease_extender_test.exs @@ -135,8 +135,10 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do log = capture_log(fn -> - pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) + pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50) assert_receive :done, 100 + Process.sleep(10) + Logger.flush() LeaseExtender.stop(pid) end) @@ -160,8 +162,10 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do log = capture_log(fn -> - pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) + pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50) assert_receive :done, 100 + Process.sleep(10) + Logger.flush() LeaseExtender.stop(pid) end) @@ -187,11 +191,11 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do holder: @holder_id, obtained_at: System.system_time(:millisecond), expires_at: System.system_time(:millisecond) + 30_000 - }, 30_000, interval: 10) + }, 30_000, interval: 50) assert_receive :done, 100 - LeaseExtender.stop(pid) - # Allow time for log to be captured Process.sleep(10) + Logger.flush() + LeaseExtender.stop(pid) end) assert log =~ "Transaction failed extending lease" diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index 3222c0b..757965d 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -29,15 +29,23 @@ defmodule Bedrock.JobQueue.StoreTest do test "creates keyspaces with proper structure" do keyspaces = Store.queue_keyspaces(root(), "tenant_1") - assert %{items: items, leases: leases, stats: stats} = keyspaces + assert %{ + dead_letter: dead_letter, + items: items, + leases: leases, + stats: stats + } = keyspaces + assert dead_letter.key_encoding == nil assert items.key_encoding == TupleEncoding assert leases.key_encoding == nil assert stats.key_encoding == nil # Verify prefix contains expected path components + assert String.contains?(Keyspace.prefix(dead_letter), "dead_letter/") assert String.contains?(Keyspace.prefix(items), "items/") assert String.contains?(Keyspace.prefix(leases), "leases/") assert String.contains?(Keyspace.prefix(stats), "stats/") + refute String.starts_with?(Keyspace.prefix(dead_letter), Keyspace.prefix(items)) end end @@ -81,21 +89,60 @@ defmodule Bedrock.JobQueue.StoreTest do end describe "peek/4 priority ordering" do + test "ignores non-item rows under the item scan prefix" do + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{}, priority: 100, vesting_time: 1000) + encoded_item = :erlang.term_to_binary(item) + packed_item_key = Keyspace.pack(keyspaces.items, Item.key(item)) + legacy_dead_letter_key = + Keyspace.prefix(keyspaces.items) <> + TupleEncoding.pack("../dead_letter/") <> TupleEncoding.pack("1000/#{item.id}") + legacy_dead_letter_item = :erlang.term_to_binary(%{item | id: "dead-lettered"}) + + expect(MockRepo, :get_range, fn + %Keyspace{}, _opts -> + flunk("tuple-encoded item keyspaces must be scanned as raw ranges") + + {start_key, end_key}, _opts when is_binary(start_key) and is_binary(end_key) -> + assert packed_item_key >= start_key + assert packed_item_key < end_key + + [ + {legacy_dead_letter_key, legacy_dead_letter_item}, + {packed_item_key, encoded_item} + ] + end) + + assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, limit: 10, now: 2000) + assert item_id == item.id + end + test "returns items in priority order (lowest number first)" do # Create items with different priorities high_priority = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1000) medium_priority = Item.new("tenant_1", "topic", %{}, priority: 50, vesting_time: 1000) low_priority = Item.new("tenant_1", "topic", %{}, priority: 200, vesting_time: 1000) + keyspaces = Store.queue_keyspaces(root(), "tenant_1") # Encode items items = [ - {Item.key(low_priority), :erlang.term_to_binary(low_priority)}, - {Item.key(high_priority), :erlang.term_to_binary(high_priority)}, - {Item.key(medium_priority), :erlang.term_to_binary(medium_priority)} + { + Keyspace.pack(keyspaces.items, Item.key(low_priority)), + :erlang.term_to_binary(low_priority) + }, + { + Keyspace.pack(keyspaces.items, Item.key(high_priority)), + :erlang.term_to_binary(high_priority) + }, + { + Keyspace.pack(keyspaces.items, Item.key(medium_priority)), + :erlang.term_to_binary(medium_priority) + } ] # Mock returns items in arbitrary order - peek should sort by key - expect(MockRepo, :get_range, fn %Keyspace{}, _opts -> + expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts -> # Return sorted by key (simulating DB behavior) Enum.sort_by(items, fn {key, _} -> key end) end) @@ -113,13 +160,20 @@ defmodule Bedrock.JobQueue.StoreTest do Item.new("tenant_1", "topic", %{data: "earlier"}, priority: 100, vesting_time: 1000) later = Item.new("tenant_1", "topic", %{data: "later"}, priority: 100, vesting_time: 2000) + keyspaces = Store.queue_keyspaces(root(), "tenant_1") items = [ - {Item.key(later), :erlang.term_to_binary(later)}, - {Item.key(earlier), :erlang.term_to_binary(earlier)} + { + Keyspace.pack(keyspaces.items, Item.key(later)), + :erlang.term_to_binary(later) + }, + { + Keyspace.pack(keyspaces.items, Item.key(earlier)), + :erlang.term_to_binary(earlier) + } ] - expect(MockRepo, :get_range, fn %Keyspace{}, _opts -> + expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts -> Enum.sort_by(items, fn {key, _} -> key end) end) @@ -395,6 +449,35 @@ defmodule Bedrock.JobQueue.StoreTest do end end + describe "complete/3" do + test "uses the stored lease item key when the caller lease is stale" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: now) + stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + extended_expires_at = stale_lease.expires_at + 5_000 + current_item_key = {item.priority, extended_expires_at, item.id} + stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key} + + current_item = %{ + item + | lease_id: stale_lease.id, + lease_expires_at: extended_expires_at, + vesting_time: extended_expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, current_item) + MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease)) + + assert :ok = Store.complete(MockRepo, root(), stale_lease) + assert MockRepo.get(keyspaces.items, current_item_key) == nil + assert MockRepo.get(keyspaces.leases, stale_lease.item_id) == nil + end + end + describe "requeue/4" do test "uses base_delay for the first retry visibility time" do now = 10_000 @@ -413,6 +496,7 @@ defmodule Bedrock.JobQueue.StoreTest do {:ok, store} = start_mock_store() setup_integration_stubs(MockRepo, store) store_item(store, keyspaces.items, leased_item) + MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease)) assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000) assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999) @@ -421,6 +505,75 @@ defmodule Bedrock.JobQueue.StoreTest do assert item_id == item.id end + + test "uses the stored lease item key when the caller lease is stale" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now) + stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + extended_expires_at = stale_lease.expires_at + 5_000 + current_item_key = {item.priority, extended_expires_at, item.id} + stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key} + + current_item = %{ + item + | lease_id: stale_lease.id, + lease_expires_at: extended_expires_at, + vesting_time: extended_expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, current_item) + MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease)) + + assert {:ok, :requeued} = + Store.requeue(MockRepo, root(), stale_lease, now: now, base_delay: 1_000) + + assert MockRepo.get(keyspaces.items, current_item_key) == nil + + assert [%Item{id: item_id, error_count: 1, lease_id: nil}] = + Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + + assert item_id == item.id + end + + test "dead letters max-retry items outside the item scan prefix" do + now = 10_000 + queue_id = "tenant_1" + keyspaces = Store.queue_keyspaces(root(), queue_id) + item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 1, vesting_time: now) + lease = Lease.new(item, "holder", duration_ms: 5_000, now: now) + + leased_item = %{ + item + | lease_id: lease.id, + lease_expires_at: lease.expires_at, + vesting_time: lease.expires_at + } + + {:ok, store} = start_mock_store() + setup_integration_stubs(MockRepo, store) + store_item(store, keyspaces.items, leased_item) + MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease)) + + assert {:ok, :dead_lettered} = Store.requeue(MockRepo, root(), lease, now: now) + assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 1_000) + assert MockRepo.get(keyspaces.items, Item.key(leased_item)) == nil + assert MockRepo.get(keyspaces.leases, lease.item_id) == nil + + dead_letter_entries = + Agent.get(store, fn state -> + Enum.filter(state, fn {{prefix, _key}, _value} -> + prefix == Keyspace.prefix(keyspaces.dead_letter) + end) + end) + + assert [{_storage_key, encoded_item}] = dead_letter_entries + assert %Item{id: item_id} = :erlang.binary_to_term(encoded_item) + assert item_id == item.id + end end describe "gc_stale_pointers/3" do diff --git a/test/support/store_test_helpers.ex b/test/support/store_test_helpers.ex index 97d45cf..f703ea7 100644 --- a/test/support/store_test_helpers.ex +++ b/test/support/store_test_helpers.ex @@ -25,9 +25,11 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do import ExUnit.Assertions import Mox + alias Bedrock.Encoding.Tuple, as: TupleEncoding alias Bedrock.JobQueue.Item alias Bedrock.JobQueue.Lease alias Bedrock.JobQueue.QueueLease + alias Bedrock.JobQueue.Store alias Bedrock.Keyspace # ============================================================================ @@ -123,17 +125,19 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do # ============================================================================ @doc """ - Expects a get_range on the items keyspace for a specific queue_id. - Verifies the keyspace contains items path and returns the given items. + Expects a raw get_range over the items keyspace for a specific queue_id. + Verifies the key range contains the queue items path and returns the given items. Items should be a list of `{key, encoded_value}` tuples. """ def expect_peek(repo, queue_id, items) do - expect(repo, :get_range, fn %Keyspace{} = ks, opts -> - prefix = Keyspace.prefix(ks) + root = Keyspace.new("job_queue/") + keyspaces = Store.queue_keyspaces(root, queue_id) + prefix = Keyspace.prefix(keyspaces.items) - assert String.contains?(prefix, "queues/#{queue_id}/items/"), - "Expected items keyspace for queue #{queue_id}, got: #{prefix}" + expect(repo, :get_range, fn {start_key, end_key}, opts -> + assert start_key == prefix + assert end_key > prefix assert is_list(opts), "Expected opts to be a list" items @@ -347,7 +351,10 @@ defmodule Bedrock.JobQueue.Test.StoreHelpers do defp key_in_range?(_, _, _), do: false - defp extract_key_value({{prefix, _k}, v}), do: {prefix, v} + defp extract_key_value({{prefix, key}, v}) when is_tuple(key), + do: {prefix <> TupleEncoding.pack(key), v} + + defp extract_key_value({{prefix, key}, v}) when is_binary(key), do: {prefix <> key, v} defp extract_key_value({k, v}), do: {k, v} @doc """