Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion lib/bedrock/job_queue/item.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,23 @@ defmodule Bedrock.JobQueue.Item do
end

@doc """
Returns true if the job is currently visible (vesting_time has passed and not leased).
Returns true if the job is currently visible.

An item is visible when its vesting time has passed and it is not actively
leased. Expired leases are considered visible so another worker can reclaim
stale work.
"""
@spec visible?(t()) :: boolean()
@spec visible?(t(), non_neg_integer()) :: boolean()
def visible?(item, now \\ System.system_time(:millisecond))

def visible?(%__MODULE__{vesting_time: vt, lease_id: nil}, now), do: now >= vt

def visible?(%__MODULE__{vesting_time: vt, lease_expires_at: exp}, now)
when not is_nil(exp) do
now >= vt and now >= exp
end

def visible?(%__MODULE__{}, _now), do: false

@doc """
Expand Down
55 changes: 28 additions & 27 deletions lib/bedrock/job_queue/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -271,38 +271,39 @@ defmodule Bedrock.JobQueue.Store do
value ->
current_item = decode(value)

if current_item.lease_id == nil do
# Create lease
lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now)
lease_expires_at = now + duration_ms
if Item.leased?(current_item, now: now) do
{:error, :already_leased}
else
do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now)
end
end
end

# Update item with lease info and new vesting_time
updated_item = %{
current_item
| lease_id: lease.id,
lease_expires_at: lease_expires_at,
vesting_time: lease_expires_at
}
defp do_obtain_lease(repo, keyspaces, pointers, current_item, holder, duration_ms, now) do
lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now)
lease_expires_at = now + duration_ms
pending_item? = current_item.lease_id == nil

# Delete old item key (vesting_time changed)
repo.clear(keyspaces.items, item_key)
updated_item = %{
current_item
| lease_id: lease.id,
lease_expires_at: lease_expires_at,
vesting_time: lease_expires_at
}

# Write with new key (new vesting_time)
new_item_key = Item.key(updated_item)
repo.put(keyspaces.items, new_item_key, encode(updated_item))
repo.clear(keyspaces.items, Item.key(current_item))

# Write lease record
repo.put(keyspaces.leases, lease.item_id, encode(lease))
new_item_key = Item.key(updated_item)
repo.put(keyspaces.items, new_item_key, encode(updated_item))
repo.put(keyspaces.leases, lease.item_id, encode(lease))

# Update pointer and stats
update_pointer(repo, pointers, lease_expires_at, item.queue_id, now)
update_stats(repo, keyspaces, -1, 1)
update_pointer(repo, pointers, lease_expires_at, current_item.queue_id, now)

{:ok, lease}
else
{:error, :already_leased}
end
if pending_item? do
update_stats(repo, keyspaces, -1, 1)
end

{:ok, lease}
end

@doc """
Expand Down Expand Up @@ -497,13 +498,13 @@ defmodule Bedrock.JobQueue.Store do
# Explicit base_delay overrides backoff_fn (used by snooze)
base_delay = Keyword.get(opts, :base_delay) ->
max_delay = Keyword.get(opts, :max_delay, 60_000)
(base_delay * :math.pow(2, error_count)) |> trunc() |> min(max_delay)
(base_delay * :math.pow(2, error_count - 1)) |> trunc() |> min(max_delay)

backoff_fn = Keyword.get(opts, :backoff_fn) ->
backoff_fn.(error_count)

true ->
(1000 * :math.pow(2, error_count)) |> trunc() |> min(60_000)
(1000 * :math.pow(2, error_count - 1)) |> trunc() |> min(60_000)
end
end

Expand Down
68 changes: 68 additions & 0 deletions test/bedrock/job_queue/store_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ defmodule Bedrock.JobQueue.StoreTest do

refute Item.visible?(item, now)
end

test "items with expired lease are visible again" do
now = 10_000

item = %{
Item.new("queue", "topic", %{}, vesting_time: 9_000)
| lease_id: <<1, 2, 3>>,
lease_expires_at: 9_000
}

assert Item.visible?(item, now)
end
end

describe "Lease creation" do
Expand Down Expand Up @@ -353,6 +365,62 @@ defmodule Bedrock.JobQueue.StoreTest do

assert {:ok, []} = result
end

test "reclaims items whose lease expired" do
now = 10_000
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{n: 1}, vesting_time: 8_000)
expired_lease = Lease.new(item, "old_holder", duration_ms: 1_000, now: 8_000)

leased_item = %{
item
| lease_id: expired_lease.id,
lease_expires_at: expired_lease.expires_at,
vesting_time: expired_lease.expires_at
}

{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, leased_item)

assert [%Item{id: item_id}] = Store.peek(MockRepo, root(), queue_id, now: now)
assert item_id == item.id

assert {:ok, [%Lease{holder: "new_holder"}]} =
Store.dequeue(MockRepo, root(), queue_id, "new_holder",
now: now,
lease_duration: 5_000
)
end
end

describe "requeue/4" do
test "uses base_delay for the first retry visibility time" do
now = 10_000
queue_id = "tenant_1"
keyspaces = Store.queue_keyspaces(root(), queue_id)
item = Item.new(queue_id, "topic", %{n: 1}, max_retries: 3, vesting_time: now)
lease = Lease.new(item, "holder", duration_ms: 5_000, now: now)

leased_item = %{
item
| lease_id: lease.id,
lease_expires_at: lease.expires_at,
vesting_time: lease.expires_at
}

{:ok, store} = start_mock_store()
setup_integration_stubs(MockRepo, store)
store_item(store, keyspaces.items, leased_item)

assert {:ok, :requeued} = Store.requeue(MockRepo, root(), lease, now: now, base_delay: 1_000)
assert [] = Store.peek(MockRepo, root(), queue_id, now: now + 999)
assert [%Item{id: item_id, error_count: 1}] =
Store.peek(MockRepo, root(), queue_id, now: now + 1_000)

assert item_id == item.id
end
end

describe "gc_stale_pointers/3" do
Expand Down
Loading