Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions lib/bedrock/job_queue/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Bedrock.JobQueue.Store do
Returns a map with keyspaces for items, leases, and stats.
"""
@spec queue_keyspaces(root_keyspace(), String.t()) :: %{
dead_letter: Keyspace.t(),
items: Keyspace.t(),
leases: Keyspace.t(),
stats: Keyspace.t()
Expand All @@ -52,6 +53,7 @@ defmodule Bedrock.JobQueue.Store do
queue_ks = Keyspace.partition(root, "queues/#{queue_id}/")

%{
dead_letter: Keyspace.partition(queue_ks, "dead_letter/"),
items: Keyspace.partition(queue_ks, "items/", key_encoding: TupleEncoding),
leases: Keyspace.partition(queue_ks, "leases/"),
stats: Keyspace.partition(queue_ks, "stats/")
Expand Down Expand Up @@ -192,7 +194,7 @@ defmodule Bedrock.JobQueue.Store do
# Uses Stream to avoid loading all items into memory
# Stops early once we have enough visible items OR hit max_scan
keyspaces.items
|> repo.get_range(limit: max_scan)
|> item_keyspace_range(repo, limit: max_scan)
|> Stream.map(fn {_key, value} -> decode(value) end)
|> Stream.filter(&Item.visible?(&1, now))
|> Enum.take(limit)
Expand Down Expand Up @@ -387,8 +389,7 @@ defmodule Bedrock.JobQueue.Store do

case verify_lease(repo, keyspaces, lease) do
{:ok, stored_lease} ->
# Use provided item_key or fall back to stored lease's item_key
item_key = lease.item_key || stored_lease.item_key
item_key = stored_lease.item_key
repo.clear(keyspaces.items, item_key)
repo.clear(keyspaces.leases, lease.item_id)

Expand Down Expand Up @@ -430,16 +431,12 @@ defmodule Bedrock.JobQueue.Store do
pointers = pointer_keyspace(root)
now = Keyword.get(opts, :now) || System.system_time(:millisecond)

# Get item_key from lease or fetch from stored lease
with {:ok, item_key} <- resolve_item_key(repo, keyspaces, lease),
{:ok, item} <- fetch_item(repo, keyspaces, item_key) do
do_requeue(repo, keyspaces, pointers, lease, item, item_key, opts, now)
end
end

defp resolve_item_key(_repo, _keyspaces, %Lease{item_key: item_key}) when item_key != nil,
do: {:ok, item_key}

defp resolve_item_key(repo, keyspaces, %Lease{} = lease) do
case verify_lease(repo, keyspaces, lease) do
{:ok, stored_lease} -> {:ok, stored_lease.item_key}
Expand Down Expand Up @@ -542,7 +539,7 @@ defmodule Bedrock.JobQueue.Store do
# Scan all items and find minimum vesting_time
# Items are sorted by {priority, vesting_time, id}, so we need to check all
keyspaces.items
|> repo.get_range(limit: limit)
|> item_keyspace_range(repo, limit: limit)
|> Enum.reduce(nil, fn {_key, value}, acc ->
item = decode(value)

Expand Down Expand Up @@ -687,7 +684,7 @@ defmodule Bedrock.JobQueue.Store do

defp queue_empty?(repo, root, queue_id) do
keyspaces = queue_keyspaces(root, queue_id)
repo.get_range(keyspaces.items, limit: 1) == []
Enum.empty?(item_keyspace_range(keyspaces.items, repo, limit: 1))
end

# Private helpers
Expand Down Expand Up @@ -720,6 +717,37 @@ defmodule Bedrock.JobQueue.Store do
defp decode_timestamp(nil), do: 0
defp decode_timestamp(<<time::64-little>>), do: time

defp item_keyspace_range(keyspace, repo, opts) do
prefix = Keyspace.prefix(keyspace)

prefix
|> Bedrock.KeyRange.from_prefix()
|> repo.get_range(opts)
|> Stream.filter(fn {key, _value} -> item_storage_key?(key, prefix) end)
end

defp item_storage_key?(key, prefix) do
prefix_len = byte_size(prefix)

case key do
<<^prefix::binary-size(prefix_len), suffix::binary>> -> item_key_suffix?(suffix)
_ -> false
end
end

defp item_key_suffix?(suffix) do
case TupleEncoding.unpack(suffix) do
{priority, vesting_time, id}
when is_integer(priority) and is_integer(vesting_time) and is_binary(id) ->
true

_ ->
false
end
rescue
ArgumentError -> false
end

# Atomically updates pending and processing stats
defp update_stats(repo, keyspaces, pending_delta, processing_delta) do
if pending_delta != 0 do
Expand Down Expand Up @@ -749,11 +777,9 @@ defmodule Bedrock.JobQueue.Store do
defp decode_counter(_), do: 0

defp move_to_dead_letter(repo, keyspaces, item_key, item, now) do
dead_letter_ks = Keyspace.partition(keyspaces.items, "../dead_letter/")

# Write to dead letter with failed_at timestamp
dl_key = "#{now}/#{item.id}"
repo.put(dead_letter_ks, dl_key, encode(item))
repo.put(keyspaces.dead_letter, dl_key, encode(item))

# Delete from main queue and update stats
repo.clear(keyspaces.items, item_key)
Expand Down
14 changes: 9 additions & 5 deletions test/bedrock/job_queue/consumer/lease_extender_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do

log =
capture_log(fn ->
pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10)
pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50)
assert_receive :done, 100
Process.sleep(10)
Logger.flush()
LeaseExtender.stop(pid)
end)

Expand All @@ -160,8 +162,10 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do

log =
capture_log(fn ->
pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10)
pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 50)
assert_receive :done, 100
Process.sleep(10)
Logger.flush()
LeaseExtender.stop(pid)
end)

Expand All @@ -187,11 +191,11 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do
holder: @holder_id,
obtained_at: System.system_time(:millisecond),
expires_at: System.system_time(:millisecond) + 30_000
}, 30_000, interval: 10)
}, 30_000, interval: 50)
assert_receive :done, 100
LeaseExtender.stop(pid)
# Allow time for log to be captured
Process.sleep(10)
Logger.flush()
LeaseExtender.stop(pid)
end)

assert log =~ "Transaction failed extending lease"
Expand Down
169 changes: 161 additions & 8 deletions test/bedrock/job_queue/store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,23 @@ defmodule Bedrock.JobQueue.StoreTest do
test "creates keyspaces with proper structure" do
keyspaces = Store.queue_keyspaces(root(), "tenant_1")

assert %{items: items, leases: leases, stats: stats} = keyspaces
assert %{
dead_letter: dead_letter,
items: items,
leases: leases,
stats: stats
} = keyspaces
assert dead_letter.key_encoding == nil
assert items.key_encoding == TupleEncoding
assert leases.key_encoding == nil
assert stats.key_encoding == nil

# Verify prefix contains expected path components
assert String.contains?(Keyspace.prefix(dead_letter), "dead_letter/")
assert String.contains?(Keyspace.prefix(items), "items/")
assert String.contains?(Keyspace.prefix(leases), "leases/")
assert String.contains?(Keyspace.prefix(stats), "stats/")
refute String.starts_with?(Keyspace.prefix(dead_letter), Keyspace.prefix(items))
end
end

Expand Down Expand Up @@ -81,21 +89,60 @@ defmodule Bedrock.JobQueue.StoreTest do
end

describe "peek/4 priority ordering" do
test "ignores non-item rows under the item scan prefix" do
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{}, priority: 100, vesting_time: 1000)
encoded_item = :erlang.term_to_binary(item)
packed_item_key = Keyspace.pack(keyspaces.items, Item.key(item))
legacy_dead_letter_key =
Keyspace.prefix(keyspaces.items) <>
TupleEncoding.pack("../dead_letter/") <> TupleEncoding.pack("1000/#{item.id}")
legacy_dead_letter_item = :erlang.term_to_binary(%{item | id: "dead-lettered"})

expect(MockRepo, :get_range, fn
%Keyspace{}, _opts ->
flunk("tuple-encoded item keyspaces must be scanned as raw ranges")

{start_key, end_key}, _opts when is_binary(start_key) and is_binary(end_key) ->
assert packed_item_key >= start_key
assert packed_item_key < end_key

[
{legacy_dead_letter_key, legacy_dead_letter_item},
{packed_item_key, encoded_item}
]
end)

assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, limit: 10, now: 2000)
assert item_id == item.id
end

test "returns items in priority order (lowest number first)" do
# Create items with different priorities
high_priority = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1000)
medium_priority = Item.new("tenant_1", "topic", %{}, priority: 50, vesting_time: 1000)
low_priority = Item.new("tenant_1", "topic", %{}, priority: 200, vesting_time: 1000)
keyspaces = Store.queue_keyspaces(root(), "tenant_1")

# Encode items
items = [
{Item.key(low_priority), :erlang.term_to_binary(low_priority)},
{Item.key(high_priority), :erlang.term_to_binary(high_priority)},
{Item.key(medium_priority), :erlang.term_to_binary(medium_priority)}
{
Keyspace.pack(keyspaces.items, Item.key(low_priority)),
:erlang.term_to_binary(low_priority)
},
{
Keyspace.pack(keyspaces.items, Item.key(high_priority)),
:erlang.term_to_binary(high_priority)
},
{
Keyspace.pack(keyspaces.items, Item.key(medium_priority)),
:erlang.term_to_binary(medium_priority)
}
]

# Mock returns items in arbitrary order - peek should sort by key
expect(MockRepo, :get_range, fn %Keyspace{}, _opts ->
expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts ->
# Return sorted by key (simulating DB behavior)
Enum.sort_by(items, fn {key, _} -> key end)
end)
Expand All @@ -113,13 +160,20 @@ defmodule Bedrock.JobQueue.StoreTest do
Item.new("tenant_1", "topic", %{data: "earlier"}, priority: 100, vesting_time: 1000)

later = Item.new("tenant_1", "topic", %{data: "later"}, priority: 100, vesting_time: 2000)
keyspaces = Store.queue_keyspaces(root(), "tenant_1")

items = [
{Item.key(later), :erlang.term_to_binary(later)},
{Item.key(earlier), :erlang.term_to_binary(earlier)}
{
Keyspace.pack(keyspaces.items, Item.key(later)),
:erlang.term_to_binary(later)
},
{
Keyspace.pack(keyspaces.items, Item.key(earlier)),
:erlang.term_to_binary(earlier)
}
]

expect(MockRepo, :get_range, fn %Keyspace{}, _opts ->
expect(MockRepo, :get_range, fn {_start_key, _end_key}, _opts ->
Enum.sort_by(items, fn {key, _} -> key end)
end)

Expand Down Expand Up @@ -395,6 +449,35 @@ defmodule Bedrock.JobQueue.StoreTest do
end
end

describe "complete/3" do
test "uses the stored lease item key when the caller lease is stale" do
now = 10_000
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: now)
stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now)
extended_expires_at = stale_lease.expires_at + 5_000
current_item_key = {item.priority, extended_expires_at, item.id}
stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key}

current_item = %{
item
| lease_id: stale_lease.id,
lease_expires_at: extended_expires_at,
vesting_time: extended_expires_at
}

{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, current_item)
MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease))

assert :ok = Store.complete(MockRepo, root(), stale_lease)
assert MockRepo.get(keyspaces.items, current_item_key) == nil
assert MockRepo.get(keyspaces.leases, stale_lease.item_id) == nil
end
end

describe "requeue/4" do
test "uses base_delay for the first retry visibility time" do
now = 10_000
Expand All @@ -413,6 +496,7 @@ defmodule Bedrock.JobQueue.StoreTest do
{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, leased_item)
MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease))

assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000)
assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999)
Expand All @@ -421,6 +505,75 @@ defmodule Bedrock.JobQueue.StoreTest do

assert item_id == item.id
end

test "uses the stored lease item key when the caller lease is stale" do
now = 10_000
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now)
stale_lease = Lease.new(item, "holder", duration_ms: 5_000, now: now)
extended_expires_at = stale_lease.expires_at + 5_000
current_item_key = {item.priority, extended_expires_at, item.id}
stored_lease = %{stale_lease | expires_at: extended_expires_at, item_key: current_item_key}

current_item = %{
item
| lease_id: stale_lease.id,
lease_expires_at: extended_expires_at,
vesting_time: extended_expires_at
}

{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, current_item)
MockRepo.put(keyspaces.leases, stale_lease.item_id, :erlang.term_to_binary(stored_lease))

assert {:ok, :requeued} =
Store.requeue(MockRepo, root(), stale_lease, now: now, base_delay: 1_000)

assert MockRepo.get(keyspaces.items, current_item_key) == nil

assert [%Item{id: item_id, error_count: 1, lease_id: nil}] =
Store.peek(MockRepo, root(), queue_id, now: now + 1_000)

assert item_id == item.id
end

test "dead letters max-retry items outside the item scan prefix" do
now = 10_000
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 1, vesting_time: now)
lease = Lease.new(item, "holder", duration_ms: 5_000, now: now)

leased_item = %{
item
| lease_id: lease.id,
lease_expires_at: lease.expires_at,
vesting_time: lease.expires_at
}

{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, leased_item)
MockRepo.put(keyspaces.leases, lease.item_id, :erlang.term_to_binary(lease))

assert {:ok, :dead_lettered} = Store.requeue(MockRepo, root(), lease, now: now)
assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 1_000)
assert MockRepo.get(keyspaces.items, Item.key(leased_item)) == nil
assert MockRepo.get(keyspaces.leases, lease.item_id) == nil

dead_letter_entries =
Agent.get(store, fn state ->
Enum.filter(state, fn {{prefix, _key}, _value} ->
prefix == Keyspace.prefix(keyspaces.dead_letter)
end)
end)

assert [{_storage_key, encoded_item}] = dead_letter_entries
assert %Item{id: item_id} = :erlang.binary_to_term(encoded_item)
assert item_id == item.id
end
end

describe "gc_stale_pointers/3" do
Expand Down
Loading
Loading