diff --git a/lib/bedrock/job_queue.ex b/lib/bedrock/job_queue.ex index bb9a3ad..6ea03a0 100644 --- a/lib/bedrock/job_queue.ex +++ b/lib/bedrock/job_queue.ex @@ -84,6 +84,8 @@ defmodule Bedrock.JobQueue do - `:otp_app` - The OTP application name (required) - `:repo` - The Bedrock Repo module (required) - `:workers` - Map of topic strings to job modules (default: %{}) + - `:on_action` - Optional `{module, function, extra_args}` hook invoked in + the same transaction as queue completion/requeue actions ## Example @@ -104,6 +106,7 @@ defmodule Bedrock.JobQueue do @otp_app Keyword.fetch!(unquote(opts), :otp_app) @repo Keyword.fetch!(unquote(opts), :repo) @workers Keyword.get(unquote(opts), :workers, %{}) + @action_hook Keyword.get(unquote(opts), :on_action) @doc """ Returns a child specification for this JobQueue. @@ -157,7 +160,8 @@ defmodule Bedrock.JobQueue do def stats(queue_id, opts \\ []), do: Internal.stats(__MODULE__, queue_id, opts) @doc false - def __config__, do: %{otp_app: @otp_app, repo: @repo, workers: @workers} + def __config__, + do: %{otp_app: @otp_app, repo: @repo, workers: @workers, action_hook: @action_hook} end end end diff --git a/lib/bedrock/job_queue/consumer.ex b/lib/bedrock/job_queue/consumer.ex index 60b33cd..999e11d 100644 --- a/lib/bedrock/job_queue/consumer.ex +++ b/lib/bedrock/job_queue/consumer.ex @@ -25,11 +25,14 @@ defmodule Bedrock.JobQueue.Consumer do - `:repo` - Required. The Bedrock Repo module - `:workers` - Required. Map of topic strings to job modules + - `:action_hook` - Optional hook invoked inside queue action transactions - `:name` - Process name (default: `Bedrock.JobQueue.Consumer`) - `:root` - Required. Root keyspace (from Directory) - `:concurrency` - Number of concurrent workers (default: `System.schedulers_online()`) - `:batch_size` - Items to dequeue per batch (default: 10) - `:scan_interval` - How often to scan for ready queues in ms (default: 100) + - `:lease_duration` - Item lease duration in ms (default: 30_000) + - `:queue_lease_duration` - Queue lease duration in ms (default: 5_000) - `:backoff_fn` - Retry backoff function (default: `Bedrock.JobQueue.Config.default_backoff/1`) - `:gc_interval` - How often to garbage collect stale pointers in ms (default: 60_000) - `:gc_grace_period` - Grace period before GC considers pointer stale in ms (default: 60_000) @@ -62,6 +65,8 @@ defmodule Bedrock.JobQueue.Consumer do concurrency = Keyword.get(opts, :concurrency, System.schedulers_online()) batch_size = Keyword.get(opts, :batch_size, 10) scan_interval = Keyword.get(opts, :scan_interval, 100) + lease_duration = Keyword.get(opts, :lease_duration, 30_000) + queue_lease_duration = Keyword.get(opts, :queue_lease_duration, 5_000) backoff_fn = Keyword.get(opts, :backoff_fn, &Config.default_backoff/1) # Generate unique names for child processes @@ -81,9 +86,12 @@ defmodule Bedrock.JobQueue.Consumer do repo: repo, root: root, workers: workers, + action_hook: Keyword.get(opts, :action_hook), worker_pool: pool_name, concurrency: concurrency, batch_size: batch_size, + lease_duration: lease_duration, + queue_lease_duration: queue_lease_duration, backoff_fn: backoff_fn}, {Scanner, name: scanner_name, diff --git a/lib/bedrock/job_queue/consumer/lease_extender.ex b/lib/bedrock/job_queue/consumer/lease_extender.ex index c9cd079..4bc5ee1 100644 --- a/lib/bedrock/job_queue/consumer/lease_extender.ex +++ b/lib/bedrock/job_queue/consumer/lease_extender.ex @@ -80,10 +80,21 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtender do end) case result do - {:ok, {:ok, updated_lease}} -> + {:ok, updated_lease = %Lease{}} -> Logger.debug("Extended lease for item #{Base.encode16(lease.item_id, case: :lower)}") updated_lease + {:ok, {:ok, updated_lease = %Lease{}}} -> + Logger.debug("Extended lease for item #{Base.encode16(lease.item_id, case: :lower)}") + updated_lease + + {:error, reason} when reason in [:not_found, :expired, :not_holder] -> + Logger.warning( + "Failed to extend lease for item #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}" + ) + + lease + {:ok, {:error, reason}} -> Logger.warning( "Failed to extend lease for item #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}" diff --git a/lib/bedrock/job_queue/consumer/manager.ex b/lib/bedrock/job_queue/consumer/manager.ex index e1c48e9..8bc5015 100644 --- a/lib/bedrock/job_queue/consumer/manager.ex +++ b/lib/bedrock/job_queue/consumer/manager.ex @@ -11,6 +11,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do - `:repo` - Required. The Bedrock Repo module - `:workers` - Required. Map of topic strings to job modules - `:worker_pool` - Required. The Task.Supervisor for spawning job tasks + - `:action_hook` - Optional hook invoked inside queue action transactions - `:name` - Process name (default: `Bedrock.JobQueue.Consumer.Manager`) - `:root` - Root keyspace (default: `Keyspace.new("job_queue/")`) - `:concurrency` - Max concurrent workers (default: `System.schedulers_online()`) @@ -39,6 +40,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do :repo, :root, :workers, + :action_hook, :worker_pool, :concurrency, :batch_size, @@ -66,6 +68,7 @@ defmodule Bedrock.JobQueue.Consumer.Manager do repo: Keyword.fetch!(opts, :repo), root: Keyword.fetch!(opts, :root), workers: Keyword.fetch!(opts, :workers), + action_hook: Keyword.get(opts, :action_hook), worker_pool: Keyword.fetch!(opts, :worker_pool), concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()), batch_size: Keyword.get(opts, :batch_size, @default_batch_size), @@ -252,36 +255,70 @@ defmodule Bedrock.JobQueue.Consumer.Manager do defp handle_worker_result(state, lease, result) do case result do success when success in [:ok] or (is_tuple(success) and elem(success, 0) == :ok) -> - run_job_action(state, lease, :complete) + run_job_action(state, lease, :complete, result) {:error, _reason} -> - run_job_action(state, lease, :requeue) + run_job_action(state, lease, :requeue, result) {:discard, reason} -> Logger.info( "Discarding job #{Base.encode16(lease.item_id, case: :lower)}: #{inspect(reason)}" ) - run_job_action(state, lease, :complete) + run_job_action(state, lease, :complete, result) {:snooze, delay_ms} -> - run_job_action(state, lease, {:snooze, delay_ms}) + run_job_action(state, lease, {:snooze, delay_ms}, result) end end - defp run_job_action(state, lease, action) do + defp run_job_action(state, lease, action, handler_result) do state.repo.transact(fn -> - case action do - :complete -> - Store.complete(state.repo, state.root, lease) + queue_result = + case action do + :complete -> + Store.complete(state.repo, state.root, lease) - :requeue -> - Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn) + :requeue -> + Store.requeue(state.repo, state.root, lease, backoff_fn: state.backoff_fn) - {:snooze, delay_ms} -> - # Snooze uses explicit delay, bypassing backoff_fn - Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms) + {:snooze, delay_ms} -> + # Snooze uses explicit delay, bypassing backoff_fn + Store.requeue(state.repo, state.root, lease, base_delay: delay_ms, max_delay: delay_ms) + end + + with :ok <- normalize_queue_result(queue_result), + :ok <- run_action_hook(state, lease, action, handler_result, queue_result) do + queue_result + else + {:error, reason} -> state.repo.rollback(reason) end end) end + + defp normalize_queue_result(:ok), do: :ok + defp normalize_queue_result({:ok, _status}), do: :ok + defp normalize_queue_result({:error, reason}), do: {:error, reason} + + defp run_action_hook(%{action_hook: nil}, _lease, _action, _handler_result, _queue_result), do: :ok + + defp run_action_hook(state, lease, action, handler_result, queue_result) do + hook_args = [state.repo, state.root, lease, action, handler_result, queue_result] + + hook_result = + case state.action_hook do + {module, function} -> + apply(module, function, hook_args) + + {module, function, extra_args} when is_list(extra_args) -> + apply(module, function, hook_args ++ extra_args) + end + + case hook_result do + :ok -> :ok + {:ok, _value} -> :ok + {:error, reason} -> {:error, reason} + other -> {:error, {:invalid_action_hook_return, other}} + end + end end diff --git a/lib/bedrock/job_queue/item.ex b/lib/bedrock/job_queue/item.ex index d5db0c0..d26ec90 100644 --- a/lib/bedrock/job_queue/item.ex +++ b/lib/bedrock/job_queue/item.ex @@ -91,6 +91,9 @@ defmodule Bedrock.JobQueue.Item do def visible?(%__MODULE__{vesting_time: vt, lease_id: nil}, now), do: now >= vt + def visible?(%__MODULE__{vesting_time: vt, lease_expires_at: exp}, now) when not is_nil(exp), + do: now >= vt and exp <= now + def visible?(%__MODULE__{}, _now), do: false @doc """ diff --git a/lib/bedrock/job_queue/store.ex b/lib/bedrock/job_queue/store.ex index cb7b7bd..b0ccde1 100644 --- a/lib/bedrock/job_queue/store.ex +++ b/lib/bedrock/job_queue/store.ex @@ -33,6 +33,7 @@ defmodule Bedrock.JobQueue.Store do alias Bedrock.JobQueue.Item alias Bedrock.JobQueue.Lease alias Bedrock.JobQueue.QueueLease + alias Bedrock.KeyRange alias Bedrock.Keyspace @type repo :: module() @@ -174,7 +175,7 @@ defmodule Bedrock.JobQueue.Store do Items are visible when: - vesting_time <= now - - lease_id is nil (not currently leased) + - lease_id is nil, or the item lease has expired Options: - :limit - Maximum items to return (default: 10) @@ -191,8 +192,8 @@ defmodule Bedrock.JobQueue.Store do # Scan items in priority order, collect visible ones # Uses Stream to avoid loading all items into memory # Stops early once we have enough visible items OR hit max_scan - keyspaces.items - |> repo.get_range(limit: max_scan) + repo + |> item_rows(keyspaces, limit: max_scan) |> Stream.map(fn {_key, value} -> decode(value) end) |> Stream.filter(&Item.visible?(&1, now)) |> Enum.take(limit) @@ -228,13 +229,11 @@ defmodule Bedrock.JobQueue.Store do lease_duration = Keyword.get(opts, :lease_duration, 30_000) now = Keyword.get(opts, :now, System.system_time(:millisecond)) - # Peek for visible items items = peek(repo, root, queue_id, limit: limit, now: now) - # Obtain leases on each item leases = Enum.reduce(items, [], fn item, acc -> - case obtain_lease(repo, root, item, holder, lease_duration) do + case obtain_lease(repo, root, item, holder, lease_duration, now: now) do {:ok, lease} -> [lease | acc] {:error, _} -> acc end @@ -271,38 +270,46 @@ defmodule Bedrock.JobQueue.Store do value -> current_item = decode(value) - if current_item.lease_id == nil do - # Create lease - lease = Lease.new(current_item, holder, duration_ms: duration_ms, now: now) - lease_expires_at = now + duration_ms + cond do + current_item.lease_id == nil -> + create_lease(repo, root, keyspaces, pointers, current_item, item_key, holder, duration_ms, now, + stats_delta: {-1, 1} + ) - # Update item with lease info and new vesting_time - updated_item = %{ - current_item - | lease_id: lease.id, - lease_expires_at: lease_expires_at, - vesting_time: lease_expires_at - } + not Item.leased?(current_item, now: now) -> + create_lease(repo, root, keyspaces, pointers, current_item, item_key, holder, duration_ms, now, + stats_delta: {0, 0} + ) - # Delete old item key (vesting_time changed) - repo.clear(keyspaces.items, item_key) + true -> + {:error, :already_leased} + end + end + end - # Write with new key (new vesting_time) - new_item_key = Item.key(updated_item) - repo.put(keyspaces.items, new_item_key, encode(updated_item)) + defp create_lease(repo, _root, keyspaces, pointers, item, item_key, holder, duration_ms, now, opts) do + {pending_delta, processing_delta} = Keyword.fetch!(opts, :stats_delta) + lease = Lease.new(item, holder, duration_ms: duration_ms, now: now) + lease_expires_at = now + duration_ms - # Write lease record - repo.put(keyspaces.leases, lease.item_id, encode(lease)) + updated_item = %{ + item + | lease_id: lease.id, + lease_expires_at: lease_expires_at, + vesting_time: lease_expires_at + } - # Update pointer and stats - update_pointer(repo, pointers, lease_expires_at, item.queue_id, now) - update_stats(repo, keyspaces, -1, 1) + repo.clear(keyspaces.items, item_key) - {:ok, lease} - else - {:error, :already_leased} - end - end + new_item_key = Item.key(updated_item) + repo.put(keyspaces.items, new_item_key, encode(updated_item)) + + repo.put(keyspaces.leases, lease.item_id, encode(lease)) + + update_pointer(repo, pointers, lease_expires_at, item.queue_id, now) + update_stats(repo, keyspaces, pending_delta, processing_delta) + + {:ok, lease} end @doc """ @@ -540,8 +547,8 @@ defmodule Bedrock.JobQueue.Store do # Scan all items and find minimum vesting_time # Items are sorted by {priority, vesting_time, id}, so we need to check all - keyspaces.items - |> repo.get_range(limit: limit) + repo + |> item_rows(keyspaces, limit: limit) |> Enum.reduce(nil, fn {_key, value}, acc -> item = decode(value) @@ -686,11 +693,22 @@ defmodule Bedrock.JobQueue.Store do defp queue_empty?(repo, root, queue_id) do keyspaces = queue_keyspaces(root, queue_id) - repo.get_range(keyspaces.items, limit: 1) == [] + item_rows(repo, keyspaces, limit: 1) == [] end # Private helpers + defp item_rows(repo, keyspaces, opts) do + if function_exported?(repo, :__cluster__, 0) do + keyspaces.items + |> Keyspace.prefix() + |> KeyRange.from_prefix() + |> repo.get_range(opts) + else + repo.get_range(keyspaces.items, opts) + end + end + defp encode(term), do: :erlang.term_to_binary(term) defp decode(binary), do: :erlang.binary_to_term(binary) diff --git a/lib/bedrock/job_queue/supervisor.ex b/lib/bedrock/job_queue/supervisor.ex index 4054d67..afec453 100644 --- a/lib/bedrock/job_queue/supervisor.ex +++ b/lib/bedrock/job_queue/supervisor.ex @@ -17,12 +17,15 @@ defmodule Bedrock.JobQueue.Supervisor do - `:concurrency` - Number of concurrent workers (default: System.schedulers_online()) - `:batch_size` - Items to dequeue per batch (default: 10) + - `:scan_interval` - Scanner interval in ms (default: 100) + - `:lease_duration` - Item lease duration in ms (default: 30_000) + - `:queue_lease_duration` - Queue lease duration in ms (default: 5_000) + - `:root` - Optional precomputed queue root keyspace. When omitted, the + supervisor initializes the queue directory through Bedrock. """ def start_link(job_queue_module, opts \\ []) do config = job_queue_module.__config__() - - # Initialize the directory and cache the keyspace before starting supervisor - {:ok, root} = Internal.init_root(config.repo, job_queue_module) + root = Keyword.get_lazy(opts, :root, fn -> init_root!(config.repo, job_queue_module) end) Supervisor.start_link(__MODULE__, {job_queue_module, root, opts}, name: job_queue_module) end @@ -37,10 +40,22 @@ defmodule Bedrock.JobQueue.Supervisor do repo: config.repo, root: root, workers: config.workers, + action_hook: Keyword.get(opts, :action_hook, Map.get(config, :action_hook)), concurrency: Keyword.get(opts, :concurrency, System.schedulers_online()), - batch_size: Keyword.get(opts, :batch_size, 10)} + batch_size: Keyword.get(opts, :batch_size, 10), + scan_interval: Keyword.get(opts, :scan_interval, 100), + lease_duration: Keyword.get(opts, :lease_duration, 30_000), + queue_lease_duration: Keyword.get(opts, :queue_lease_duration, 5_000), + backoff_fn: Keyword.get(opts, :backoff_fn, &Bedrock.JobQueue.Config.default_backoff/1), + gc_interval: Keyword.get(opts, :gc_interval, 60_000), + gc_grace_period: Keyword.get(opts, :gc_grace_period, 60_000)} ] Supervisor.init(children, strategy: :one_for_one) end + + defp init_root!(repo, job_queue_module) do + {:ok, root} = Internal.init_root(repo, job_queue_module) + root + end end diff --git a/test/bedrock/job_queue/consumer/lease_extender_test.exs b/test/bedrock/job_queue/consumer/lease_extender_test.exs index 1374035..fc01cab 100644 --- a/test/bedrock/job_queue/consumer/lease_extender_test.exs +++ b/test/bedrock/job_queue/consumer/lease_extender_test.exs @@ -126,6 +126,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do send(test_pid, :done) result end) + expect(MockRepo, :get, fn _, _ -> :erlang.term_to_binary(ctx.lease) end) expect(MockRepo, :get, fn _, _ -> :erlang.term_to_binary(ctx.leased_item) end) expect(MockRepo, :clear, fn _, _ -> :ok end) @@ -134,9 +135,37 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do expect(MockRepo, :max, fn _, _ -> :ok end) log = - capture_log(fn -> + capture_log([level: :debug], fn -> pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) assert_receive :done, 100 + Process.sleep(10) + LeaseExtender.stop(pid) + end) + + assert log =~ "Extended lease for item" + end + + test "accepts a direct successful transaction result from real Bedrock repos", ctx do + test_pid = self() + + expect(MockRepo, :transact, fn callback -> + result = callback.() + send(test_pid, :done) + result + end) + + expect(MockRepo, :get, fn _, _ -> :erlang.term_to_binary(ctx.lease) end) + expect(MockRepo, :get, fn _, _ -> :erlang.term_to_binary(ctx.leased_item) end) + expect(MockRepo, :clear, fn _, _ -> :ok end) + expect(MockRepo, :put, fn _, _, _ -> :ok end) + expect(MockRepo, :put, fn _, _, _ -> :ok end) + expect(MockRepo, :max, fn _, _ -> :ok end) + + log = + capture_log([level: :debug], fn -> + pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) + assert_receive :done, 100 + Process.sleep(10) LeaseExtender.stop(pid) end) @@ -151,6 +180,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do send(test_pid, :done) result end) + # verify_lease returns nil -> :lease_not_found expect(MockRepo, :get, fn ks, key -> assert Keyspace.prefix(ks) == Keyspace.prefix(ctx.keyspaces.leases) @@ -162,6 +192,7 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do capture_log(fn -> pid = LeaseExtender.start(MockRepo, ctx.root, ctx.lease, 30_000, interval: 10) assert_receive :done, 100 + Process.sleep(10) LeaseExtender.stop(pid) end) @@ -179,15 +210,23 @@ defmodule Bedrock.JobQueue.Consumer.LeaseExtenderTest do log = capture_log(fn -> - pid = LeaseExtender.start(MockRepo, Keyspace.new("test/"), %Lease{ - id: "lease_id", - item_id: <<1, 2, 3>>, - item_key: {100, 0, <<1, 2, 3>>}, - queue_id: "tenant_1", - holder: @holder_id, - obtained_at: System.system_time(:millisecond), - expires_at: System.system_time(:millisecond) + 30_000 - }, 30_000, interval: 10) + pid = + LeaseExtender.start( + MockRepo, + Keyspace.new("test/"), + %Lease{ + id: "lease_id", + item_id: <<1, 2, 3>>, + item_key: {100, 0, <<1, 2, 3>>}, + queue_id: "tenant_1", + holder: @holder_id, + obtained_at: System.system_time(:millisecond), + expires_at: System.system_time(:millisecond) + 30_000 + }, + 30_000, + interval: 10 + ) + assert_receive :done, 100 LeaseExtender.stop(pid) # Allow time for log to be captured diff --git a/test/bedrock/job_queue/consumer/manager_test.exs b/test/bedrock/job_queue/consumer/manager_test.exs index 9135e47..77267e9 100644 --- a/test/bedrock/job_queue/consumer/manager_test.exs +++ b/test/bedrock/job_queue/consumer/manager_test.exs @@ -16,15 +16,51 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do @holder_id <<1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16>> defmodule SuccessJob do + @moduledoc false def perform(_args, _meta), do: :ok def timeout, do: 1000 end + defmodule ErrorJob do + @moduledoc false + def perform(_args, _meta), do: {:error, :failed} + def timeout, do: 1000 + end + + defmodule DiscardJob do + @moduledoc false + def perform(_args, _meta), do: {:discard, :bad_payload} + def timeout, do: 1000 + end + + defmodule SnoozeJob do + @moduledoc false + def perform(_args, _meta), do: {:snooze, 123} + def timeout, do: 1000 + end + defmodule CrashingJob do + @moduledoc false def perform(_args, _meta), do: exit(:crash) def timeout, do: 1000 end + defmodule ActionHook do + @moduledoc false + def apply(repo, root, lease, action, handler_result, queue_result, test_pid) do + send(test_pid, {:action_hook, repo, root, lease.item_id, action, handler_result, queue_result}) + :ok + end + end + + defmodule FailingActionHook do + @moduledoc false + def apply(_repo, _root, _lease, _action, _handler_result, _queue_result, test_pid) do + send(test_pid, :failing_action_hook_called) + {:error, :hook_failed} + end + end + setup do pool_name = :"TestPool_#{System.unique_integer()}" {:ok, pool} = Task.Supervisor.start_link(name: pool_name, max_children: 5) @@ -35,6 +71,9 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do workers = %{ "test:success" => SuccessJob, + "test:error" => ErrorJob, + "test:discard" => DiscardJob, + "test:snooze" => SnoozeJob, "test:crash" => CrashingJob } @@ -125,5 +164,63 @@ defmodule Bedrock.JobQueue.Consumer.ManagerTest do _ = :sys.get_state(manager) assert Process.alive?(manager) end + + test "runs action hook inside successful queue action", ctx do + item = enqueue_item(ctx, "test:success") + manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive {:action_hook, MockRepo, root, item_id, :complete, :ok, :ok} + assert root == ctx.root + assert item_id == item.id + end + + test "runs action hook with requeue action for failed jobs", ctx do + item = enqueue_item(ctx, "test:error") + manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive {:action_hook, MockRepo, _root, item_id, :requeue, {:error, :failed}, {:ok, :requeued}} + assert item_id == item.id + end + + test "runs action hook with snooze action for delayed jobs", ctx do + item = enqueue_item(ctx, "test:snooze") + manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive {:action_hook, MockRepo, _root, item_id, {:snooze, 123}, {:snooze, 123}, {:ok, :requeued}} + assert item_id == item.id + end + + test "runs action hook with complete action for discarded jobs", ctx do + item = enqueue_item(ctx, "test:discard") + manager = start_manager(ctx, action_hook: {ActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive {:action_hook, MockRepo, _root, item_id, :complete, {:discard, :bad_payload}, :ok} + assert item_id == item.id + end + + test "rolls back the queue transaction when the action hook fails", ctx do + _item = enqueue_item(ctx, "test:success") + test_pid = self() + + expect(MockRepo, :rollback, fn :hook_failed -> + send(test_pid, {:rollback_called, :hook_failed}) + {:error, :hook_failed} + end) + + manager = start_manager(ctx, action_hook: {FailingActionHook, :apply, [self()]}) + + send(manager, {:queue_ready, "tenant_1"}) + + assert_receive :failing_action_hook_called + assert_receive {:rollback_called, :hook_failed} + end end end diff --git a/test/bedrock/job_queue/consumer_test.exs b/test/bedrock/job_queue/consumer_test.exs new file mode 100644 index 0000000..270a5b5 --- /dev/null +++ b/test/bedrock/job_queue/consumer_test.exs @@ -0,0 +1,68 @@ +defmodule Bedrock.JobQueue.ConsumerTest do + use ExUnit.Case, async: true + + alias Bedrock.JobQueue.Consumer + alias Bedrock.JobQueue.Consumer.Manager + alias Bedrock.JobQueue.Consumer.Scanner + alias Bedrock.Keyspace + + describe "init/1" do + test "forwards timing and lifecycle options to manager and scanner children" do + root = Keyspace.new("job_queue/test/") + backoff_fn = fn attempt -> attempt * 10 end + + assert {:ok, {_flags, children}} = + Consumer.init( + repo: MockRepo, + workers: %{"topic" => __MODULE__}, + root: root, + concurrency: 2, + batch_size: 3, + scan_interval: 4, + lease_duration: 5, + queue_lease_duration: 6, + backoff_fn: backoff_fn, + gc_interval: 7, + gc_grace_period: 8 + ) + + assert task_supervisor_opts(children)[:max_children] == 2 + + manager_opts = child_opts(children, Manager) + assert manager_opts[:repo] == MockRepo + assert manager_opts[:root] == root + assert manager_opts[:workers] == %{"topic" => __MODULE__} + assert manager_opts[:concurrency] == 2 + assert manager_opts[:batch_size] == 3 + assert manager_opts[:lease_duration] == 5 + assert manager_opts[:queue_lease_duration] == 6 + assert manager_opts[:backoff_fn] == backoff_fn + + scanner_opts = child_opts(children, Scanner) + assert scanner_opts[:repo] == MockRepo + assert scanner_opts[:root] == root + assert scanner_opts[:concurrency] == 2 + assert scanner_opts[:batch_size] == 3 + assert scanner_opts[:interval] == 4 + assert scanner_opts[:gc_interval] == 7 + assert scanner_opts[:gc_grace_period] == 8 + end + end + + defp child_opts(children, module) do + assert %{start: {^module, :start_link, [opts]}} = + Enum.find(children, fn child -> child.id == module end) + + opts + end + + defp task_supervisor_opts(children) do + assert %{start: {Task.Supervisor, :start_link, [opts]}} = + Enum.find(children, fn + %{start: {Task.Supervisor, :start_link, [_opts]}} -> true + _ -> false + end) + + opts + end +end diff --git a/test/bedrock/job_queue/store_test.exs b/test/bedrock/job_queue/store_test.exs index dcac988..de2bd58 100644 --- a/test/bedrock/job_queue/store_test.exs +++ b/test/bedrock/job_queue/store_test.exs @@ -9,6 +9,7 @@ defmodule Bedrock.JobQueue.StoreTest do alias Bedrock.JobQueue.Lease alias Bedrock.JobQueue.QueueLease alias Bedrock.JobQueue.Store + alias Bedrock.KeyRange alias Bedrock.Keyspace setup :verify_on_exit! @@ -19,6 +20,17 @@ defmodule Bedrock.JobQueue.StoreTest do :ok end + defmodule RealRepoShape do + @moduledoc false + + def __cluster__, do: :test_cluster + + def get_range({start_key, end_key}, opts) when is_binary(start_key) and is_binary(end_key) do + send(Process.get({__MODULE__, :test_pid}), {:raw_get_range, start_key, end_key, opts}) + [{start_key, :erlang.term_to_binary(Process.get({__MODULE__, :item}))}] + end + end + defp root, do: Keyspace.new("job_queue/") # ============================================================================ @@ -81,6 +93,24 @@ defmodule Bedrock.JobQueue.StoreTest do end describe "peek/4 priority ordering" do + test "scans raw key ranges for real Bedrock repos" do + item = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1_000) + + Process.put({RealRepoShape, :test_pid}, self()) + Process.put({RealRepoShape, :item}, item) + + keyspaces = Store.queue_keyspaces(root(), "tenant_1") + prefix = Keyspace.prefix(keyspaces.items) + + assert [^item] = Store.peek(RealRepoShape, root(), "tenant_1", limit: 2, now: 2_000) + assert_received {:raw_get_range, start_key, end_key, opts} + assert {start_key, end_key} == KeyRange.from_prefix(prefix) + assert opts[:limit] == 20 + + Process.delete({RealRepoShape, :test_pid}) + Process.delete({RealRepoShape, :item}) + end + test "returns items in priority order (lowest number first)" do # Create items with different priorities high_priority = Item.new("tenant_1", "topic", %{}, priority: 10, vesting_time: 1000) @@ -152,6 +182,18 @@ defmodule Bedrock.JobQueue.StoreTest do refute Item.visible?(item, now) end + + test "items with expired leases are visible for recovery" do + now = 10_000 + + item = %{ + Item.new("queue", "topic", %{}, vesting_time: 9_000) + | lease_id: <<1, 2, 3>>, + lease_expires_at: 9_000 + } + + assert Item.visible?(item, now) + end end describe "Lease creation" do diff --git a/test/bedrock/job_queue/supervisor_test.exs b/test/bedrock/job_queue/supervisor_test.exs index 8a1d88a..8f6df6a 100644 --- a/test/bedrock/job_queue/supervisor_test.exs +++ b/test/bedrock/job_queue/supervisor_test.exs @@ -57,6 +57,15 @@ defmodule Bedrock.JobQueue.SupervisorTest do Supervisor.stop(pid) :persistent_term.erase({Bedrock.JobQueue.Internal, TestJobQueue}) end + + test "accepts a precomputed root keyspace without initializing the directory" do + root = Keyspace.new("job_queue/precomputed/") + + assert {:ok, pid} = JQSupervisor.start_link(TestJobQueue, root: root) + assert Process.alive?(pid) + + Supervisor.stop(pid) + end end describe "init/1" do @@ -71,5 +80,38 @@ defmodule Bedrock.JobQueue.SupervisorTest do [child] = children assert child.id == Bedrock.JobQueue.Consumer end + + test "forwards consumer runtime options" do + root = Keyspace.new("job_queue/test/") + backoff_fn = fn attempt -> attempt * 100 end + + opts = [ + action_hook: fn -> :ok end, + concurrency: 2, + batch_size: 3, + scan_interval: 4, + lease_duration: 5, + queue_lease_duration: 6, + backoff_fn: backoff_fn, + gc_interval: 7, + gc_grace_period: 8 + ] + + assert {:ok, {_sup_flags, [child]}} = JQSupervisor.init({TestJobQueue, root, opts}) + assert %{start: {Bedrock.JobQueue.Consumer, :start_link, [consumer_opts]}} = child + + assert consumer_opts[:repo] == MockRepo + assert consumer_opts[:root] == root + assert consumer_opts[:workers] == %{"test:job" => TestJobQueue} + assert is_function(consumer_opts[:action_hook], 0) + assert consumer_opts[:concurrency] == 2 + assert consumer_opts[:batch_size] == 3 + assert consumer_opts[:scan_interval] == 4 + assert consumer_opts[:lease_duration] == 5 + assert consumer_opts[:queue_lease_duration] == 6 + assert consumer_opts[:backoff_fn] == backoff_fn + assert consumer_opts[:gc_interval] == 7 + assert consumer_opts[:gc_grace_period] == 8 + end end end