Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/bedrock/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ defmodule Bedrock.JobQueue do
- `:otp_app` - The OTP application name (required)
- `:repo` - The Bedrock Repo module (required)
- `:workers` - Map of topic strings to job modules (default: %{})
- `:on_action` - Optional `{module, function, extra_args}` hook invoked in
the same transaction as queue completion/requeue actions

## Example

Expand All @@ -104,6 +106,7 @@ defmodule Bedrock.JobQueue do
@otp_app Keyword.fetch!(unquote(opts), :otp_app)
@repo Keyword.fetch!(unquote(opts), :repo)
@workers Keyword.get(unquote(opts), :workers, %{})
@action_hook Keyword.get(unquote(opts), :on_action)

@doc """
Returns a child specification for this JobQueue.
Expand Down Expand Up @@ -157,7 +160,8 @@ defmodule Bedrock.JobQueue do
def stats(queue_id, opts \\ []), do: Internal.stats(__MODULE__, queue_id, opts)

@doc false
def __config__, do: %{otp_app: @otp_app, repo: @repo, workers: @workers}
def __config__,
do: %{otp_app: @otp_app, repo: @repo, workers: @workers, action_hook: @action_hook}
end
end
end
8 changes: 8 additions & 0 deletions lib/bedrock/job_queue/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ defmodule Bedrock.JobQueue.Consumer do

- `:repo` - Required. The Bedrock Repo module
- `:workers` - Required. Map of topic strings to job modules
- `:action_hook` - Optional hook invoked inside queue action transactions
- `:name` - Process name (default: `Bedrock.JobQueue.Consumer`)
- `:root` - Required. Root keyspace (from Directory)
- `:concurrency` - Number of concurrent workers (default: `System.schedulers_online()`)
- `:batch_size` - Items to dequeue per batch (default: 10)
- `:scan_interval` - How often to scan for ready queues in ms (default: 100)
- `:lease_duration` - Item lease duration in ms (default: 30_000)
- `:queue_lease_duration` - Queue lease duration in ms (default: 5_000)
- `:backoff_fn` - Retry backoff function (default: `Bedrock.JobQueue.Config.default_backoff/1`)
- `:gc_interval` - How often to garbage collect stale pointers in ms (default: 60_000)
- `:gc_grace_period` - Grace period before GC considers pointer stale in ms (default: 60_000)
Expand Down Expand Up @@ -62,6 +65,8 @@ defmodule Bedrock.JobQueue.Consumer do
concurrency = Keyword.get(opts, :concurrency, System.schedulers_online())
batch_size = Keyword.get(opts, :batch_size, 10)
scan_interval = Keyword.get(opts, :scan_interval, 100)
lease_duration = Keyword.get(opts, :lease_duration, 30_000)
queue_lease_duration = Keyword.get(opts, :queue_lease_duration, 5_000)
backoff_fn = Keyword.get(opts, :backoff_fn, &Config.default_backoff/1)

# Generate unique names for child processes
Expand All @@ -81,9 +86,12 @@ defmodule Bedrock.JobQueue.Consumer do
repo: repo,
root: root,
workers: workers,
action_hook: Keyword.get(opts, :action_hook),
worker_pool: pool_name,
concurrency: concurrency,
batch_size: batch_size,
lease_duration: lease_duration,
queue_lease_duration: queue_lease_duration,
backoff_fn: backoff_fn},
{Scanner,
name: scanner_name,
Expand Down
13 changes: 12 additions & 1 deletion lib/bedrock/job_queue/consumer/lease_extender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,21 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtender do
end)

case result do
{:ok, {:ok, updated_lease}} ->
{:ok, updated_lease = %Lease{}} ->
Logger.debug("Extended lease for item #{Base.encode16(lease.item_id, case: :lower)}")
updated_lease

{:ok, {:ok, updated_lease = %Lease{}}} ->
Logger.debug("Extended lease for item #{Base.encode16(lease.item_id, case: :lower)}")
updated_lease

{:error, reason} when reason in [:not_found, :expired, :not_holder] ->
Logger.warning(
"Failed to extend lease for item #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}"
)

lease

{:ok, {:error, reason}} ->
Logger.warning(
"Failed to extend lease for item #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}"
Expand Down
63 changes: 50 additions & 13 deletions lib/bedrock/job_queue/consumer/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
- `:repo` - Required. The Bedrock Repo module
- `:workers` - Required. Map of topic strings to job modules
- `:worker_pool` - Required. The Task.Supervisor for spawning job tasks
- `:action_hook` - Optional hook invoked inside queue action transactions
- `:name` - Process name (default: `Bedrock.JobQueue.Consumer.Manager`)
- `:root` - Root keyspace (default: `Keyspace.new("job_queue/")`)
- `:concurrency` - Max concurrent workers (default: `System.schedulers_online()`)
Expand Down Expand Up @@ -39,6 +40,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
:repo,
:root,
:workers,
:action_hook,
:worker_pool,
:concurrency,
:batch_size,
Expand Down Expand Up @@ -66,6 +68,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
repo: Keyword.fetch!(opts, :repo),
root: Keyword.fetch!(opts, :root),
workers: Keyword.fetch!(opts, :workers),
action_hook: Keyword.get(opts, :action_hook),
worker_pool: Keyword.fetch!(opts, :worker_pool),
concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()),
batch_size: Keyword.get(opts, :batch_size, @default_batch_size),
Expand Down Expand Up @@ -252,36 +255,70 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
defp handle_worker_result(state, lease, result) do
case result do
success when success in [:ok] or (is_tuple(success) and elem(success, 0) == :ok) ->
run_job_action(state, lease, :complete)
run_job_action(state, lease, :complete, result)

{:error, _reason} ->
run_job_action(state, lease, :requeue)
run_job_action(state, lease, :requeue, result)

{:discard, reason} ->
Logger.info(
"Discarding job #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}"
)

run_job_action(state, lease, :complete)
run_job_action(state, lease, :complete, result)

{:snooze, delay_ms} ->
run_job_action(state, lease, {:snooze, delay_ms})
run_job_action(state, lease, {:snooze, delay_ms}, result)
end
end

defp run_job_action(state, lease, action) do
defp run_job_action(state, lease, action, handler_result) do
state.repo.transact(fn ->
case action do
:complete ->
Store.complete(state.repo, state.root, lease)
queue_result =
case action do
:complete ->
Store.complete(state.repo, state.root, lease)

:requeue ->
Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn)
:requeue ->
Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn)

{:snooze, delay_ms} ->
# Snooze uses explicit delay, bypassing backoff_fn
Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms)
{:snooze, delay_ms} ->
# Snooze uses explicit delay, bypassing backoff_fn
Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms)
end

with :ok <- normalize_queue_result(queue_result),
:ok <- run_action_hook(state, lease, action, handler_result, queue_result) do
queue_result
else
{:error, reason} -> state.repo.rollback(reason)
end
end)
end

defp normalize_queue_result(:ok), do: :ok
defp normalize_queue_result({:ok, _status}), do: :ok
defp normalize_queue_result({:error, reason}), do: {:error, reason}

defp run_action_hook(%{action_hook: nil}, _lease, _action, _handler_result, _queue_result), do: :ok

defp run_action_hook(state, lease, action, handler_result, queue_result) do
hook_args = [state.repo, state.root, lease, action, handler_result, queue_result]

hook_result =
case state.action_hook do
{module, function} ->
apply(module, function, hook_args)

{module, function, extra_args} when is_list(extra_args) ->
apply(module, function, hook_args ++ extra_args)
end

case hook_result do
:ok -> :ok
{:ok, _value} -> :ok
{:error, reason} -> {:error, reason}
other -> {:error, {:invalid_action_hook_return, other}}
end
end
end
3 changes: 3 additions & 0 deletions lib/bedrock/job_queue/item.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ defmodule Bedrock.JobQueue.Item do

def visible?(%__MODULE__{vesting_time: vt, lease_id: nil}, now), do: now >= vt

def visible?(%__MODULE__{vesting_time: vt, lease_expires_at: exp}, now) when not is_nil(exp),
do: now >= vt and exp <= now

def visible?(%__MODULE__{}, _now), do: false

@doc """
Expand Down
88 changes: 53 additions & 35 deletions lib/bedrock/job_queue/store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
alias Bedrock.JobQueue.Item
alias Bedrock.JobQueue.Lease
alias Bedrock.JobQueue.QueueLease
alias Bedrock.KeyRange
alias Bedrock.Keyspace

@type repo :: module()
Expand Down Expand Up @@ -174,7 +175,7 @@

Items are visible when:
- vesting_time <= now
- lease_id is nil (not currently leased)
- lease_id is nil, or the item lease has expired

Options:
- :limit - Maximum items to return (default: 10)
Expand All @@ -191,8 +192,8 @@
# Scan items in priority order, collect visible ones
# Uses Stream to avoid loading all items into memory
# Stops early once we have enough visible items OR hit max_scan
keyspaces.items
|> repo.get_range(limit: max_scan)
repo
|> item_rows(keyspaces, limit: max_scan)
|> Stream.map(fn {_key, value} -> decode(value) end)
|> Stream.filter(&Item.visible?(&1, now))
|> Enum.take(limit)
Expand Down Expand Up @@ -228,13 +229,11 @@
lease_duration = Keyword.get(opts, :lease_duration, 30_000)
now = Keyword.get(opts, :now, System.system_time(:millisecond))

# Peek for visible items
items = peek(repo, root, queue_id, limit: limit, now: now)

# Obtain leases on each item
leases =
Enum.reduce(items, [], fn item, acc ->
case obtain_lease(repo, root, item, holder, lease_duration) do
case obtain_lease(repo, root, item, holder, lease_duration, now: now) do
{:ok, lease} -> [lease | acc]
{:error, _} -> acc
end
Expand Down Expand Up @@ -271,38 +270,46 @@
value ->
current_item = decode(value)

if current_item.lease_id == nil do
# Create lease
lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now)
lease_expires_at = now + duration_ms
cond do
current_item.lease_id == nil ->
create_lease(repo, root, keyspaces, pointers, current_item, item_key, holder, duration_ms, now,
stats_delta: {-1, 1}
)

# Update item with lease info and new vesting_time
updated_item = %{
current_item
| lease_id: lease.id,
lease_expires_at: lease_expires_at,
vesting_time: lease_expires_at
}
not Item.leased?(current_item, now: now) ->
create_lease(repo, root, keyspaces, pointers, current_item, item_key, holder, duration_ms, now,
stats_delta: {0, 0}
)

# Delete old item key (vesting_time changed)
repo.clear(keyspaces.items, item_key)
true ->
{:error, :already_leased}
end
end
end

# Write with new key (new vesting_time)
new_item_key = Item.key(updated_item)
repo.put(keyspaces.items, new_item_key, encode(updated_item))
defp create_lease(repo, _root, keyspaces, pointers, item, item_key, holder, duration_ms, now, opts) do

Check warning on line 290 in lib/bedrock/job_queue/store.ex

View workflow job for this annotation

GitHub Actions / Test on OTP 27.2 / Elixir 1.17.3

Function takes too many parameters (arity is 10, max is 8).

Check warning on line 290 in lib/bedrock/job_queue/store.ex

View workflow job for this annotation

GitHub Actions / Test on OTP 28.3 / Elixir 1.19.4

Function takes too many parameters (arity is 10, max is 8).
{pending_delta, processing_delta} = Keyword.fetch!(opts, :stats_delta)
lease = Lease.new(item, holder, duration_ms: duration_ms, now: now)
lease_expires_at = now + duration_ms

# Write lease record
repo.put(keyspaces.leases, lease.item_id, encode(lease))
updated_item = %{
item
| lease_id: lease.id,
lease_expires_at: lease_expires_at,
vesting_time: lease_expires_at
}

# Update pointer and stats
update_pointer(repo, pointers, lease_expires_at, item.queue_id, now)
update_stats(repo, keyspaces, -1, 1)
repo.clear(keyspaces.items, item_key)

{:ok, lease}
else
{:error, :already_leased}
end
end
new_item_key = Item.key(updated_item)
repo.put(keyspaces.items, new_item_key, encode(updated_item))

repo.put(keyspaces.leases, lease.item_id, encode(lease))

update_pointer(repo, pointers, lease_expires_at, item.queue_id, now)
update_stats(repo, keyspaces, pending_delta, processing_delta)

{:ok, lease}
end

@doc """
Expand Down Expand Up @@ -540,8 +547,8 @@

# Scan all items and find minimum vesting_time
# Items are sorted by {priority, vesting_time, id}, so we need to check all
keyspaces.items
|> repo.get_range(limit: limit)
repo
|> item_rows(keyspaces, limit: limit)
|> Enum.reduce(nil, fn {_key, value}, acc ->
item = decode(value)

Expand Down Expand Up @@ -686,11 +693,22 @@

defp queue_empty?(repo, root, queue_id) do
keyspaces = queue_keyspaces(root, queue_id)
repo.get_range(keyspaces.items, limit: 1) == []
item_rows(repo, keyspaces, limit: 1) == []
end

# Private helpers

defp item_rows(repo, keyspaces, opts) do
if function_exported?(repo, :__cluster__, 0) do
keyspaces.items
|> Keyspace.prefix()
|> KeyRange.from_prefix()
|> repo.get_range(opts)
else
repo.get_range(keyspaces.items, opts)
end
end

defp encode(term), do: :erlang.term_to_binary(term)
defp decode(binary), do: :erlang.binary_to_term(binary)

Expand Down
23 changes: 19 additions & 4 deletions lib/bedrock/job_queue/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

- `:concurrency` - Number of concurrent workers (default: System.schedulers_online())
- `:batch_size` - Items to dequeue per batch (default: 10)
- `:scan_interval` - Scanner interval in ms (default: 100)
- `:lease_duration` - Item lease duration in ms (default: 30_000)
- `:queue_lease_duration` - Queue lease duration in ms (default: 5_000)
- `:root` - Optional precomputed queue root keyspace. When omitted, the
supervisor initializes the queue directory through Bedrock.
"""
def start_link(job_queue_module, opts \\ []) do
config = job_queue_module.__config__()

# Initialize the directory and cache the keyspace before starting supervisor
{:ok, root} = Internal.init_root(config.repo, job_queue_module)
root = Keyword.get_lazy(opts, :root, fn -> init_root!(config.repo, job_queue_module) end)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking: If callers pass a precomputed root, the consumer starts against that root, but the public JobQueue.enqueue/4 path still resolves its root via Internal.root_keyspace/1. Because this path skips Internal.init_root/2, nothing writes the supplied root into :persistent_term, so enqueue can fall back to Keyspace.new("job_queue/<module>/") while the consumer scans the supplied root. Either cache the provided root here or make the public enqueue/stats path read the same configured root.


Supervisor.start_link(__MODULE__, {job_queue_module, root, opts}, name: job_queue_module)
end
Expand All @@ -37,10 +40,22 @@
repo: config.repo,
root: root,
workers: config.workers,
action_hook: Keyword.get(opts, :action_hook, Map.get(config, :action_hook)),
concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()),
batch_size: Keyword.get(opts, :batch_size, 10)}
batch_size: Keyword.get(opts, :batch_size, 10),
scan_interval: Keyword.get(opts, :scan_interval, 100),
lease_duration: Keyword.get(opts, :lease_duration, 30_000),
queue_lease_duration: Keyword.get(opts, :queue_lease_duration, 5_000),
backoff_fn: Keyword.get(opts, :backoff_fn, &Bedrock.JobQueue.Config.default_backoff/1),

Check warning on line 49 in lib/bedrock/job_queue/supervisor.ex

View workflow job for this annotation

GitHub Actions / Test on OTP 27.2 / Elixir 1.17.3

Nested modules could be aliased at the top of the invoking module.

Check warning on line 49 in lib/bedrock/job_queue/supervisor.ex

View workflow job for this annotation

GitHub Actions / Test on OTP 28.3 / Elixir 1.19.4

Nested modules could be aliased at the top of the invoking module.
gc_interval: Keyword.get(opts, :gc_interval, 60_000),
gc_grace_period: Keyword.get(opts, :gc_grace_period, 60_000)}
]

Supervisor.init(children, strategy: :one_for_one)
end

defp init_root!(repo, job_queue_module) do
{:ok, root} = Internal.init_root(repo, job_queue_module)
root
end
end
Loading
Loading