Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/bedrock/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ defmodule Bedrock.JobQueue do
- `:otp_app` - The OTP application name (required)
- `:repo` - The Bedrock Repo module (required)
- `:workers` - Map of topic strings to job modules (default: %{})
- `:on_action` - Optional `{module, function, extra_args}` hook invoked in
the same transaction as queue completion/requeue actions

## Example

Expand All @@ -104,6 +106,7 @@ defmodule Bedrock.JobQueue do
@otp_app Keyword.fetch!(unquote(opts), :otp_app)
@repo Keyword.fetch!(unquote(opts), :repo)
@workers Keyword.get(unquote(opts), :workers, %{})
@action_hook Keyword.get(unquote(opts), :on_action)

@doc """
Returns a child specification for this JobQueue.
Expand Down Expand Up @@ -157,7 +160,8 @@ defmodule Bedrock.JobQueue do
def stats(queue_id, opts \\ []), do: Internal.stats(__MODULE__, queue_id, opts)

@doc false
def __config__, do: %{otp_app: @otp_app, repo: @repo, workers: @workers}
def __config__,
do: %{otp_app: @otp_app, repo: @repo, workers: @workers, action_hook: @action_hook}
end
end
end
2 changes: 2 additions & 0 deletions lib/bedrock/job_queue/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Bedrock.JobQueue.Consumer do

- `:repo` - Required. The Bedrock Repo module
- `:workers` - Required. Map of topic strings to job modules
- `:action_hook` - Optional hook invoked inside queue action transactions
- `:name` - Process name (default: `Bedrock.JobQueue.Consumer`)
- `:root` - Required. Root keyspace (from Directory)
- `:concurrency` - Number of concurrent workers (default: `System.schedulers_online()`)
Expand Down Expand Up @@ -81,6 +82,7 @@ defmodule Bedrock.JobQueue.Consumer do
repo: repo,
root: root,
workers: workers,
action_hook: Keyword.get(opts, :action_hook),
worker_pool: pool_name,
concurrency: concurrency,
batch_size: batch_size,
Expand Down
61 changes: 48 additions & 13 deletions lib/bedrock/job_queue/consumer/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
- `:repo` - Required. The Bedrock Repo module
- `:workers` - Required. Map of topic strings to job modules
- `:worker_pool` - Required. The Task.Supervisor for spawning job tasks
- `:action_hook` - Optional hook invoked inside queue action transactions
- `:name` - Process name (default: `Bedrock.JobQueue.Consumer.Manager`)
- `:root` - Root keyspace (default: `Keyspace.new("job_queue/")`)
- `:concurrency` - Max concurrent workers (default: `System.schedulers_online()`)
Expand Down Expand Up @@ -39,6 +40,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
:repo,
:root,
:workers,
:action_hook,
:worker_pool,
:concurrency,
:batch_size,
Expand Down Expand Up @@ -66,6 +68,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
repo: Keyword.fetch!(opts, :repo),
root: Keyword.fetch!(opts, :root),
workers: Keyword.fetch!(opts, :workers),
action_hook: Keyword.get(opts, :action_hook),
worker_pool: Keyword.fetch!(opts, :worker_pool),
concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()),
batch_size: Keyword.get(opts, :batch_size, @default_batch_size),
Expand Down Expand Up @@ -252,36 +255,68 @@ defmodule Bedrock.JobQueue.Consumer.Manager do
defp handle_worker_result(state, lease, result) do
case result do
success when success in [:ok] or (is_tuple(success) and elem(success, 0) == :ok) ->
run_job_action(state, lease, :complete)
run_job_action(state, lease, :complete, result)

{:error, _reason} ->
run_job_action(state, lease, :requeue)
run_job_action(state, lease, :requeue, result)

{:discard, reason} ->
Logger.info(
"Discarding job #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}"
)

run_job_action(state, lease, :complete)
run_job_action(state, lease, :complete, result)

{:snooze, delay_ms} ->
run_job_action(state, lease, {:snooze, delay_ms})
run_job_action(state, lease, {:snooze, delay_ms}, result)
end
end

defp run_job_action(state, lease, action) do
defp run_job_action(state, lease, action, handler_result) do
state.repo.transact(fn ->
case action do
:complete ->
Store.complete(state.repo, state.root, lease)
queue_result =
case action do
:complete ->
Store.complete(state.repo, state.root, lease)

:requeue ->
Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn)
:requeue ->
Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn)

{:snooze, delay_ms} ->
# Snooze uses explicit delay, bypassing backoff_fn
Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms)
{:snooze, delay_ms} ->
# Snooze uses explicit delay, bypassing backoff_fn
Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms)
end

with :ok <- normalize_queue_result(queue_result),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking if this PR merges independently: when run_action_hook/5 returns {:error, reason}, this with returns an error tuple from inside repo.transact/1; Bedrock commits returned values unless repo.rollback/1 is called. PR #2 adds the rollback path, so I would either merge these as a stack or pull that rollback handling into this PR before merging #1.

:ok <- run_action_hook(state, lease, action, handler_result, queue_result) do
queue_result
end
end)
end

defp normalize_queue_result(:ok), do: :ok
defp normalize_queue_result({:ok, _status}), do: :ok
defp normalize_queue_result({:error, reason}), do: {:error, reason}

defp run_action_hook(%{action_hook: nil}, _lease, _action, _handler_result, _queue_result), do: :ok

defp run_action_hook(state, lease, action, handler_result, queue_result) do
hook_args = [state.repo, state.root, lease, action, handler_result, queue_result]

hook_result =
case state.action_hook do
{module, function} ->
apply(module, function, hook_args)

{module, function, extra_args} when is_list(extra_args) ->
apply(module, function, hook_args ++ extra_args)
end

case hook_result do
:ok -> :ok
{:ok, _value} -> :ok
{:error, reason} -> {:error, reason}
other -> {:error, {:invalid_action_hook_return, other}}
end
end
end
1 change: 1 addition & 0 deletions lib/bedrock/job_queue/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule Bedrock.JobQueue.Supervisor do
repo: config.repo,
root: root,
workers: config.workers,
action_hook: Keyword.get(opts, :action_hook, Map.get(config, :action_hook)),
concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()),
batch_size: Keyword.get(opts, :batch_size, 10)}
]
Expand Down
18 changes: 18 additions & 0 deletions test/bedrock/job_queue/consumer/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do
def timeout, do: 1000
end

defmodule ActionHook do
def apply(repo, root, lease, action, handler_result, queue_result, test_pid) do
send(test_pid, {:action_hook, repo, root, lease.item_id, action, handler_result, queue_result})
:ok
end
end

setup do
pool_name = :"TestPool_#{System.unique_integer()}"
{:ok, pool} = Task.Supervisor.start_link(name: pool_name, max_children: 5)
Expand Down Expand Up @@ -125,5 +132,16 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do
_ = :sys.get_state(manager)
assert Process.alive?(manager)
end

test "runs action hook inside successful queue action", ctx do
item = enqueue_item(ctx, "test:success")
manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]})

send(manager, {:queue_ready, "tenant_1"})

assert_receive {:action_hook, MockRepo, root, item_id, :complete, :ok, :ok}
assert root == ctx.root
assert item_id == item.id
end
end
end
Loading