From b5dab2338a6135937753a0b06ca15fc4857d6fe2 Mon Sep 17 00:00:00 2001 From: Mike Hostetler <84222+mikehostetler@users.noreply.github.com> Date: Thu, 14 May 2026 11:56:22 -0500 Subject: [PATCH] feat: add queue action hooks Supports intent-ledger-28h.3 --- lib/bedrock/job_queue.ex | 6 +- lib/bedrock/job_queue/consumer.ex | 2 + lib/bedrock/job_queue/consumer/manager.ex | 61 +++++++++++++++---- lib/bedrock/job_queue/supervisor.ex | 1 + .../job_queue/consumer/manager_test.exs | 18 ++++++ 5 files changed, 74 insertions(+), 14 deletions(-) diff --git a/lib/bedrock/job_queue.ex b/lib/bedrock/job_queue.ex index bb9a3ad..6ea03a0 100644 --- a/lib/bedrock/job_queue.ex +++ b/lib/bedrock/job_queue.ex @@ -84,6 +84,8 @@ defmodule Bedrock.JobQueue do - `:otp_app` - The OTP application name (required) - `:repo` - The Bedrock Repo module (required) - `:workers` - Map of topic strings to job modules (default: %{}) + - `:on_action` - Optional `{module, function, extra_args}` hook invoked in + the same transaction as queue completion/requeue actions ## Example @@ -104,6 +106,7 @@ defmodule Bedrock.JobQueue do @otp_app Keyword.fetch!(unquote(opts), :otp_app) @repo Keyword.fetch!(unquote(opts), :repo) @workers Keyword.get(unquote(opts), :workers, %{}) + @action_hook Keyword.get(unquote(opts), :on_action) @doc """ Returns a child specification for this JobQueue. @@ -157,7 +160,8 @@ defmodule Bedrock.JobQueue do def stats(queue_id, opts \\ []), do: Internal.stats(__MODULE__, queue_id, opts) @doc false - def __config__, do: %{otp_app: @otp_app, repo: @repo, workers: @workers} + def __config__, + do: %{otp_app: @otp_app, repo: @repo, workers: @workers, action_hook: @action_hook} end end end diff --git a/lib/bedrock/job_queue/consumer.ex b/lib/bedrock/job_queue/consumer.ex index 60b33cd..d0337b8 100644 --- a/lib/bedrock/job_queue/consumer.ex +++ b/lib/bedrock/job_queue/consumer.ex @@ -25,6 +25,7 @@ defmodule Bedrock.JobQueue.Consumer do - `:repo` - Required. The Bedrock Repo module - `:workers` - Required. Map of topic strings to job modules + - `:action_hook` - Optional hook invoked inside queue action transactions - `:name` - Process name (default: `Bedrock.JobQueue.Consumer`) - `:root` - Required. Root keyspace (from Directory) - `:concurrency` - Number of concurrent workers (default: `System.schedulers_online()`) @@ -81,6 +82,7 @@ defmodule Bedrock.JobQueue.Consumer do repo: repo, root: root, workers: workers, + action_hook: Keyword.get(opts, :action_hook), worker_pool: pool_name, concurrency: concurrency, batch_size: batch_size, diff --git a/lib/bedrock/job_queue/consumer/manager.ex b/lib/bedrock/job_queue/consumer/manager.ex index e1c48e9..0ee9b73 100644 --- a/lib/bedrock/job_queue/consumer/manager.ex +++ b/lib/bedrock/job_queue/consumer/manager.ex @@ -11,6 +11,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do - `:repo` - Required. The Bedrock Repo module - `:workers` - Required. Map of topic strings to job modules - `:worker_pool` - Required. The Task.Supervisor for spawning job tasks + - `:action_hook` - Optional hook invoked inside queue action transactions - `:name` - Process name (default: `Bedrock.JobQueue.Consumer.Manager`) - `:root` - Root keyspace (default: `Keyspace.new("job_queue/")`) - `:concurrency` - Max concurrent workers (default: `System.schedulers_online()`) @@ -39,6 +40,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do :repo, :root, :workers, + :action_hook, :worker_pool, :concurrency, :batch_size, @@ -66,6 +68,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do repo: Keyword.fetch!(opts, :repo), root: Keyword.fetch!(opts, :root), workers: Keyword.fetch!(opts, :workers), + action_hook: Keyword.get(opts, :action_hook), worker_pool: Keyword.fetch!(opts, :worker_pool), concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()), batch_size: Keyword.get(opts, :batch_size, @default_batch_size), @@ -252,36 +255,68 @@ defmodule Bedrock.JobQueue.Consumer.Manager do defp handle_worker_result(state, lease, result) do case result do success when success in [:ok] or (is_tuple(success) and elem(success, 0) == :ok) -> - run_job_action(state, lease, :complete) + run_job_action(state, lease, :complete, result) {:error, _reason} -> - run_job_action(state, lease, :requeue) + run_job_action(state, lease, :requeue, result) {:discard, reason} -> Logger.info( "Discarding job #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}" ) - run_job_action(state, lease, :complete) + run_job_action(state, lease, :complete, result) {:snooze, delay_ms} -> - run_job_action(state, lease, {:snooze, delay_ms}) + run_job_action(state, lease, {:snooze, delay_ms}, result) end end - defp run_job_action(state, lease, action) do + defp run_job_action(state, lease, action, handler_result) do state.repo.transact(fn -> - case action do - :complete -> - Store.complete(state.repo, state.root, lease) + queue_result = + case action do + :complete -> + Store.complete(state.repo, state.root, lease) - :requeue -> - Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn) + :requeue -> + Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn) - {:snooze, delay_ms} -> - # Snooze uses explicit delay, bypassing backoff_fn - Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms) + {:snooze, delay_ms} -> + # Snooze uses explicit delay, bypassing backoff_fn + Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms) + end + + with :ok <- normalize_queue_result(queue_result), + :ok <- run_action_hook(state, lease, action, handler_result, queue_result) do + queue_result end end) end + + defp normalize_queue_result(:ok), do: :ok + defp normalize_queue_result({:ok, _status}), do: :ok + defp normalize_queue_result({:error, reason}), do: {:error, reason} + + defp run_action_hook(%{action_hook: nil}, _lease, _action, _handler_result, _queue_result), do: :ok + + defp run_action_hook(state, lease, action, handler_result, queue_result) do + hook_args = [state.repo, state.root, lease, action, handler_result, queue_result] + + hook_result = + case state.action_hook do + {module, function} -> + apply(module, function, hook_args) + + {module, function, extra_args} when is_list(extra_args) -> + apply(module, function, hook_args ++ extra_args) + end + + case hook_result do + :ok -> :ok + {:ok, _value} -> :ok + {:error, reason} -> {:error, reason} + other -> {:error, {:invalid_action_hook_return, other}} + end + end end diff --git a/lib/bedrock/job_queue/supervisor.ex b/lib/bedrock/job_queue/supervisor.ex index 4054d67..4a0e683 100644 --- a/lib/bedrock/job_queue/supervisor.ex +++ b/lib/bedrock/job_queue/supervisor.ex @@ -37,6 +37,7 @@ defmodule Bedrock.JobQueue.Supervisor do repo: config.repo, root: root, workers: config.workers, + action_hook: Keyword.get(opts, :action_hook, Map.get(config, :action_hook)), concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()), batch_size: Keyword.get(opts, :batch_size, 10)} ] diff --git a/test/bedrock/job_queue/consumer/manager_test.exs b/test/bedrock/job_queue/consumer/manager_test.exs index 9135e47..1d010ee 100644 --- a/test/bedrock/job_queue/consumer/manager_test.exs +++ b/test/bedrock/job_queue/consumer/manager_test.exs @@ -25,6 +25,13 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do def timeout, do: 1000 end + defmodule ActionHook do + def apply(repo, root, lease, action, handler_result, queue_result, test_pid) do + send(test_pid, {:action_hook, repo, root, lease.item_id, action, handler_result, queue_result}) + :ok + end + end + setup do pool_name = :"TestPool_#{System.unique_integer()}" {:ok, pool} = Task.Supervisor.start_link(name: pool_name, max_children: 5) @@ -125,5 +132,16 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do _ = :sys.get_state(manager) assert Process.alive?(manager) end + + test "runs action hook inside successful queue action", ctx do + item = enqueue_item(ctx, "test:success") + manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive {:action_hook, MockRepo, root, item_id, :complete, :ok, :ok} + assert root == ctx.root + assert item_id == item.id + end end end