From 9d3fd3f3a78b0c6dda2ddeb9b8731f9801dddc1d Mon Sep 17 00:00:00 2001 From: Grace Yanagida Date: Thu, 16 Sep 2021 13:23:18 -0400 Subject: [PATCH 1/9] Persisting worklist using CubQ --- README.md | 3 + lib/jackalope.ex | 8 +- lib/jackalope/persistent_work_list.ex | 357 ++++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 2 + test/jackalope_test.exs | 8 +- 6 files changed, 374 insertions(+), 5 deletions(-) create mode 100644 lib/jackalope/persistent_work_list.ex diff --git a/README.md b/README.md index 24eb9a7..ab42931 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,9 @@ to discarded when confirmed by Tortoise311 as processed or when they are expired The work list has a maximum size which defaults to 100. Only a maximum number of publish commands can wait, should Tortoise311 be temporarily disconnected, to be forwarded to Tortoise311. +The waiting and pending publish commands can survive a reboot but only if the persistent work list +implementation is used. The default is the transient work list implementation. + You can set the Jackalope.start_link/1 `:work_list_mod` option to the desired work list implementation. See the documentation for module `Jackalope`. diff --git a/lib/jackalope.ex b/lib/jackalope.ex index ac57081..b17d9b7 100644 --- a/lib/jackalope.ex +++ b/lib/jackalope.ex @@ -78,7 +78,10 @@ defmodule Jackalope do - `work_list_mod` names the module implementing the Jackalope WorkList protocol that will be used to manage the publish commands sent to Tortoise by the Jackalope Session. The module must also implement the function `@spec new(function(), function(), non_neg_integer(), Keyword.t()) :: any()`. - See Jackalope.TransientWorkList (the default) for examples. + See Jackalope.TransientWorkList (the default) and Jackalope.PersistentWorkList for examples. + + - `data_dir` (defaults to the Nerves-friendly "/data/jackalope") is the directory used by PersistentWorkList + (if used) to persist the waiting-to-be-sent and pending-confirmation publish commands. - `max_work_list_size` (default: #{@default_max_work_list_size}) specifies the maximum number of unexpired work orders Jackalope will retain in its work list @@ -126,7 +129,8 @@ defmodule Jackalope do [ handler: jackalope_handler, max_work_list_size: max_work_list_size, - work_list_mod: work_list_mod + work_list_mod: work_list_mod, + data_dir: Keyword.get(opts, :data_dir) ]}, {Jackalope.Supervisor, [ diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex new file mode 100644 index 0000000..145e5ff --- /dev/null +++ b/lib/jackalope/persistent_work_list.ex @@ -0,0 +1,357 @@ +defmodule Jackalope.PersistentWorkList do + @moduledoc """ + A genserver wrapper for CubQ which we leverage to store and restore worklist tasks during disconnections + """ + use GenServer + + alias Jackalope.WorkList.Expiration + + require Logger + + @default_max_size 100 + + @tick_delay 10 * 60 * 1_000 + + defmodule State do + @moduledoc false + defstruct db: nil, + queue: nil, + queue_name: nil, + max_work_list_size: nil, + expiration_fn: nil, + update_expiration_fn: nil + end + + @doc "Create a new work list" + @spec new(function(), function(), non_neg_integer(), Keyword.t()) :: pid() + def new(expiration_fn, update_expiration_fn, max_size \\ @default_max_size, opts \\ []) do + args = [ + expiration_fn: expiration_fn, + update_expiration_fn: update_expiration_fn, + max_size: max_size, + opts: opts + ] + + Logger.info("[Jackalope] Starting #{__MODULE__} with #{inspect(opts)}") + {:ok, pid} = GenServer.start_link(__MODULE__, args) + pid + end + + @impl GenServer + def init(args) do + opts = Keyword.fetch!(args, :opts) + data_dir = Keyword.get(opts, :data_dir, "/data/jackalope") + queue_name = Keyword.get(opts, :queue_name, :items) + + db = + case CubDB.start_link(data_dir: data_dir, name: :work_list, auto_compact: true) do + {:ok, pid} -> + pid + + {:error, {:already_started, pid}} -> + pid + + other -> + Logger.warn("[Jackalope] Corrupted DB : #{inspect(other)}. Erasing it.") + _ = File.rmdir(data_dir) + raise "Corrupted work list DB" + end + + CubDB.set_auto_file_sync(db, true) + + queue = + case CubQ.start_link(db: db, queue: queue_name) do + {:ok, pid} -> + pid + + {:error, {:already_started, pid}} -> + pid + + other -> + Logger.warn("[Jackalope] Corrupted queue : #{inspect(other)}. Erasing DB.") + _ = File.rmdir(data_dir) + raise "Corrupted work list queue" + end + + send(self(), :tick) + + {:ok, + %State{ + db: db, + queue: queue, + queue_name: queue_name, + max_work_list_size: Keyword.fetch!(args, :max_size), + expiration_fn: Keyword.fetch!(args, :expiration_fn), + update_expiration_fn: Keyword.fetch!(args, :update_expiration_fn) + }, {:continue, :recover}} + end + + @impl GenServer + def handle_continue(:recover, state) do + recover(state, state.expiration_fn, state.update_expiration_fn) + CubDB.put(state.db, :pending, %{}) + {:noreply, state} + end + + @impl GenServer + def handle_info(:tick, state) do + :ok = record_time_now(state) + Process.send_after(self(), :tick, @tick_delay) + {:noreply, state} + end + + @impl GenServer + def handle_call(:count, _from, state) do + {:reply, queue_size(state), state} + end + + def handle_call(:count_pending, _from, state) do + {:reply, Enum.count(get_pending(state)), state} + end + + def handle_call(:peek, _from, state) do + peek = + case CubQ.peek_last(state.queue) do + nil -> nil + {:ok, item} -> item + end + + {:reply, peek, state} + end + + def handle_call({:done, ref}, _from, state) do + pending = get_pending(state) + {item, updated_pending} = Map.pop(pending, ref) + :ok = update_pending(state, updated_pending) + {:reply, item, state} + end + + def handle_call(:remove_all, _from, state) do + :ok = CubQ.delete_all(state.queue) + :ok = update_pending(state, %{}) + {:reply, :ok, state} + end + + def handle_call(:pop, _from, state) do + {:ok, _item} = CubQ.pop(state.queue) + {:reply, :ok, state} + end + + def handle_call({:push, item}, _from, state) do + :ok = CubQ.push(state.queue, item) + {:reply, :ok, bound_work_items(state)} + end + + def handle_call({:pending, ref}, _from, state) do + {:ok, item} = CubQ.pop(state.queue) + + add_pending(state, item, ref) + + {:reply, :ok, state} + end + + def handle_call(:reset_pending, _from, state) do + pending_items = pending_items(state) + + :ok = Enum.each(pending_items, &(:ok = CubQ.push(state.queue, &1))) + + :ok = update_pending(state, %{}) + + {:reply, :ok, bound_work_items(state)} + end + + @impl GenServer + def terminate(_reason, state) do + record_time_now(state) + end + + defp add_pending(state, item, ref) do + updated_pending = get_pending(state) |> Map.put(ref, item) |> bound_pending_items(state) + + :ok = update_pending(state, updated_pending) + end + + defp pending_items(state), do: get_pending(state) |> Map.values() + + defp bound_work_items(state) do + max = state.max_work_list_size + + if queue_size(state) > max do + :ok = remove_expired_work_items(state) + excess = queue_size(state) - max + + _ = if excess >= 0, do: remove_excess(excess, state) + end + + state + end + + defp remove_excess(excess, state) do + Enum.each( + 1..excess, + fn _i -> + {:ok, item_removed} = CubQ.dequeue(state.queue) + + Logger.info( + "[Jackalope] WorkList - The worklist still exceeds max size. #{inspect(item_removed)} was removed from the queue." + ) + end + ) + end + + defp remove_expired_work_items(state) do + Enum.each( + 1..queue_size(state), + fn _i -> + # remove from begining + {:ok, item} = CubQ.dequeue(state.queue) + + if Expiration.unexpired?(item, state.expiration_fn) do + # same as push (insert at end) + :ok = CubQ.enqueue(state.queue, item) + else + Logger.debug( + "[Jackalope] #{inspect(item)} removed from the queue due to expiration. Size is #{queue_size(state)}" + ) + end + end + ) + end + + # Trim pending as needed to accomodate an additional pending item + defp bound_pending_items(pending, state) do + if map_size(pending) > state.max_work_list_size do + # Trim expired pending requests + kept_pairs = + Enum.reduce( + pending, + [], + fn {ref, item}, acc -> + if Expiration.unexpired?(item, state.expiration_fn) do + [{ref, item} | acc] + else + acc + end + end + ) + + # If still over maximum, remove the oldest pending request (expiration is smallest) + if length(kept_pairs) > state.max_work_list_size do + [{ref, item} | newer_pairs] = + Enum.sort(kept_pairs, fn {_, item1}, {_, item2} -> + Expiration.after?(state.expiration_fn.(item2), state.expiration_fn.(item1)) + end) + + Logger.info( + "[Jackalope] Maximum number of unexpired pending requests reached. Dropping #{inspect(item)}:#{inspect(ref)}." + ) + + newer_pairs + else + kept_pairs + end + |> Enum.into(%{}) + else + pending + end + end + + defp get_pending(state), do: CubDB.get(state.db, :pending) || %{} + + defp update_pending(state, updated_pending), + do: :ok = CubDB.put(state.db, :pending, updated_pending) + + defp queue_size(state), do: CubDB.size(state.db) - 2 + + defp record_time_now(state) do + :ok = CubDB.put(state.db, :latest_time, Expiration.now()) + end + + # After restart, recover and rebase the persisted items from the previous to the new system monotonic time. + # Assumes that restart occurred shortly after shutdown, or this is the first time the work list was created. + defp recover(state, expiration_fn, update_expiration_fn) do + # latest_time is nil if this is a never-persisted work list + latest_time = CubDB.get(state.db, :latest_time) + now = Expiration.now() + # Rebase the expiration of queued (waiting) items + items_count = queue_size(state) + + if items_count > 0 do + Enum.each( + 1..items_count, + fn _i -> + {:ok, waiting_item} = CubQ.dequeue(state.queue) + + expiration = + Expiration.rebase_expiration(expiration_fn.(waiting_item), latest_time, now) + + :ok = CubQ.enqueue(state.queue, update_expiration_fn.(waiting_item, expiration)) + end + ) + end + + # Convert pending items into waiting items with rebased expirations + + pending_items(state) + |> Enum.each(fn pending_item -> + expiration = Expiration.rebase_expiration(expiration_fn.(pending_item), latest_time, now) + :ok = CubQ.enqueue(state.queue, update_expiration_fn.(pending_item, expiration)) + end) + end +end + +defimpl Jackalope.WorkList, for: PID do + @impl Jackalope.WorkList + def peek(work_list) do + GenServer.call(work_list, :peek) + end + + @impl Jackalope.WorkList + def push(work_list, item) do + :ok = GenServer.call(work_list, {:push, item}) + work_list + end + + @impl Jackalope.WorkList + def pop(work_list) do + :ok = GenServer.call(work_list, :pop) + work_list + end + + @impl Jackalope.WorkList + def pending(work_list, ref) do + :ok = GenServer.call(work_list, {:pending, ref}) + work_list + end + + @impl Jackalope.WorkList + def reset_pending(work_list) do + :ok = GenServer.call(work_list, :reset_pending) + work_list + end + + @impl Jackalope.WorkList + def done(work_list, ref) do + item = GenServer.call(work_list, {:done, ref}) + {work_list, item} + end + + @impl Jackalope.WorkList + def count(work_list) do + GenServer.call(work_list, :count) + end + + @impl Jackalope.WorkList + def count_pending(work_list) do + GenServer.call(work_list, :count_pending) + end + + @impl Jackalope.WorkList + def empty?(work_list), do: peek(work_list) == nil + + @impl Jackalope.WorkList + def remove_all(work_list) do + :ok = GenServer.call(work_list, :remove_all) + work_list + end +end diff --git a/mix.exs b/mix.exs index 485758c..5d58ab6 100644 --- a/mix.exs +++ b/mix.exs @@ -40,6 +40,7 @@ defmodule Jackalope.MixProject do {:jason, "~> 1.1"}, {:dialyxir, "~> 1.1.0", only: [:test, :dev], runtime: false}, {:credo, "~> 1.4", only: [:dev, :test], runtime: false}, + {:cubq, "~> 0.3.0"}, {:ex_doc, "~> 0.22", only: :docs, runtime: false} ] end diff --git a/mix.lock b/mix.lock index f8444ed..51451e8 100644 --- a/mix.lock +++ b/mix.lock @@ -5,6 +5,8 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.18", "e1b2be73eb08a49fb032a0208bf647380682374a725dfb5b9e510def8397f6f2", [:mix], [], "hexpm", "114a0e85ec3cf9e04b811009e73c206394ffecfcc313e0b346de0d557774ee97"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.26.0", "1922164bac0b18b02f84d6f69cab1b93bc3e870e2ad18d5dacb50a9e06b542a3", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2775d66e494a9a48355db7867478ffd997864c61c65a47d31c4949459281c78d"}, + "cubdb": {:hex, :cubdb, "1.0.0", "db421b93b6353cda41d33f9dfbe62c58edf915fb33337b96037b32d9147bd823", [:mix], [], "hexpm", "11e2c494c483b3b77d8fa5fc8a72b245ed49845f354790bc82917693b0e33517"}, + "cubq": {:hex, :cubq, "0.3.0", "56d7ddad66e9b791bd4c79253bcfdb874dfed602c1a867769d944f006e4c3462", [:mix], [{:cubdb, "~> 0.17 or ~> 1.0", [hex: :cubdb, repo: "hexpm", optional: false]}], "hexpm", "c5ccaa6eb92442b8210a549d1a060f59cc6013f1c8aec52dcfcfaa260e7cf0b9"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, diff --git a/test/jackalope_test.exs b/test/jackalope_test.exs index 3c1b53c..32d74fe 100644 --- a/test/jackalope_test.exs +++ b/test/jackalope_test.exs @@ -7,7 +7,7 @@ defmodule JackalopeTest do alias JackalopeTest.ScriptedMqttServer, as: MqttServer alias Tortoise311.Package - @work_list_mod Jackalope.TransientWorkList + @work_list_mod Jackalope.PersistentWorkList setup context do {:ok, mqtt_server_pid} = start_supervised(MqttServer) @@ -25,7 +25,8 @@ defmodule JackalopeTest do server: transport, client_id: context.client_id, handler: JackalopeTest.TestHandler, - work_list_mod: @work_list_mod + work_list_mod: @work_list_mod, + data_dir: "/tmp" ) assert_receive {MqttServer, {:received, %Package.Connect{}}} @@ -213,7 +214,8 @@ defmodule JackalopeTest do handler: handler, initial_topics: initial_topics, max_work_list_size: max_work_list_size, - work_list_mod: @work_list_mod + work_list_mod: @work_list_mod, + data_dir: "/tmp" ]} ) From eab929545706c49b213526aa5d982418adfac8e2 Mon Sep 17 00:00:00 2001 From: Frank Hunleth Date: Wed, 19 Jan 2022 15:37:29 -0500 Subject: [PATCH 2/9] Bump deps to latest --- mix.lock | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mix.lock b/mix.lock index 51451e8..82cecac 100644 --- a/mix.lock +++ b/mix.lock @@ -1,18 +1,18 @@ %{ "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, - "credo": {:hex, :credo, "1.6.1", "7dc76dcdb764a4316c1596804c48eada9fff44bd4b733a91ccbf0c0f368be61e", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "698607fb5993720c7e93d2d8e76f2175bba024de964e160e2f7151ef3ab82ac5"}, + "credo": {:hex, :credo, "1.6.2", "2f82b29a47c0bb7b72f023bf3a34d151624f1cbe1e6c4e52303b05a11166a701", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "ae9dc112bc368e7b145c547bec2ed257ef88955851c15057c7835251a17211c6"}, + "cubdb": {:hex, :cubdb, "1.1.0", "c1ad1816e3f343eb36c80d6700eca4d8cdf668e40d39f15228eb552e0afbc0d8", [:mix], [], "hexpm", "0fb5ac6b825b4b3ed6482aade8e5b8e761f170e010657d71a308d515aaf718a0"}, + "cubq": {:hex, :cubq, "0.3.0", "56d7ddad66e9b791bd4c79253bcfdb874dfed602c1a867769d944f006e4c3462", [:mix], [{:cubdb, "~> 0.17 or ~> 1.0", [hex: :cubdb, repo: "hexpm", optional: false]}], "hexpm", "c5ccaa6eb92442b8210a549d1a060f59cc6013f1c8aec52dcfcfaa260e7cf0b9"}, "dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.18", "e1b2be73eb08a49fb032a0208bf647380682374a725dfb5b9e510def8397f6f2", [:mix], [], "hexpm", "114a0e85ec3cf9e04b811009e73c206394ffecfcc313e0b346de0d557774ee97"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.19", "de0d033d5ff9fc396a24eadc2fcf2afa3d120841eb3f1004d138cbf9273210e8", [:mix], [], "hexpm", "527ab6630b5c75c3a3960b75844c314ec305c76d9899bb30f71cb85952a9dc45"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.26.0", "1922164bac0b18b02f84d6f69cab1b93bc3e870e2ad18d5dacb50a9e06b542a3", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2775d66e494a9a48355db7867478ffd997864c61c65a47d31c4949459281c78d"}, - "cubdb": {:hex, :cubdb, "1.0.0", "db421b93b6353cda41d33f9dfbe62c58edf915fb33337b96037b32d9147bd823", [:mix], [], "hexpm", "11e2c494c483b3b77d8fa5fc8a72b245ed49845f354790bc82917693b0e33517"}, - "cubq": {:hex, :cubq, "0.3.0", "56d7ddad66e9b791bd4c79253bcfdb874dfed602c1a867769d944f006e4c3462", [:mix], [{:cubdb, "~> 0.17 or ~> 1.0", [hex: :cubdb, repo: "hexpm", optional: false]}], "hexpm", "c5ccaa6eb92442b8210a549d1a060f59cc6013f1c8aec52dcfcfaa260e7cf0b9"}, + "ex_doc": {:hex, :ex_doc, "0.27.3", "d09ed7ab590b71123959d9017f6715b54a448d76b43cf909eb0b2e5a78a977b2", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "ee60b329d08195039bfeb25231a208749be4f2274eae42ce38f9be0538a2f2e6"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "gen_state_machine": {:hex, :gen_state_machine, "3.0.0", "1e57f86a494e5c6b14137ebef26a7eb342b3b0070c7135f2d6768ed3f6b6cdff", [:mix], [], "hexpm", "0a59652574bebceb7309f6b749d2a41b45fdeda8dbb4da0791e355dd19f0ed15"}, - "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, "makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.0", "b44d75e2a6542dcb6acf5d71c32c74ca88960421b6874777f79153bbbbd7dccc", [:mix], [], "hexpm", "52b2871a7515a5ac49b00f214e4165a40724cf99798d8e4a65e4fd64ebd002c1"}, - "tortoise311": {:hex, :tortoise311, "0.11.1", "2215a663062db76b8cb03e9da0b172e9a6e5e7ddb77c73428913df34516c904a", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}], "hexpm", "9c89eb0e08db554c4e417ebeacbaf28c6b32b9cd65ca49bf5444f7d598188ef7"}, + "tortoise311": {:hex, :tortoise311, "0.11.2", "7985918958ebc525730725a87346c53f8cab5075207d3eb62624c22a800b062a", [:mix], [{:gen_state_machine, "~> 2.0 or ~> 3.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}], "hexpm", "a016359b09a51226b0e20ebd10c2e88e28be37ba679841aff7cc3c12e7cbea2b"}, } From 261be8454a4c5952191b12980cc4852d33211053 Mon Sep 17 00:00:00 2001 From: Frank Hunleth Date: Wed, 19 Jan 2022 15:37:46 -0500 Subject: [PATCH 3/9] Test both transient and persistent work lists --- test/jackalope_test.exs | 183 ++++++++++++++++++++-------------------- 1 file changed, 92 insertions(+), 91 deletions(-) diff --git a/test/jackalope_test.exs b/test/jackalope_test.exs index 32d74fe..8a3e63d 100644 --- a/test/jackalope_test.exs +++ b/test/jackalope_test.exs @@ -7,8 +7,6 @@ defmodule JackalopeTest do alias JackalopeTest.ScriptedMqttServer, as: MqttServer alias Tortoise311.Package - @work_list_mod Jackalope.PersistentWorkList - setup context do {:ok, mqtt_server_pid} = start_supervised(MqttServer) Process.link(mqtt_server_pid) @@ -25,7 +23,7 @@ defmodule JackalopeTest do server: transport, client_id: context.client_id, handler: JackalopeTest.TestHandler, - work_list_mod: @work_list_mod, + work_list_mod: Jackalope.TransientWorkList, data_dir: "/tmp" ) @@ -40,7 +38,7 @@ defmodule JackalopeTest do describe "publish/3" do test "publish with QoS=0", context do - connect(context) + connect(context, work_list_mod: Jackalope.TransientWorkList) flush = expect_publish( @@ -57,7 +55,7 @@ defmodule JackalopeTest do end test "publish with QoS=1", context do - connect(context) + connect(context, work_list_mod: Jackalope.TransientWorkList) flush = expect_publish( @@ -80,91 +78,93 @@ defmodule JackalopeTest do end describe "work list" do - test "dropping work orders", context do - connect(context, max_work_list_size: 10) - pause_mqtt_server(context) - - work_list = get_session_work_list() - - work_list = - Enum.reduce(1..15, work_list, fn i, acc -> - WorkList.push( - acc, - {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, - [expiration: Expiration.expiration(:infinity)]} - ) - end) - - assert WorkList.count(work_list) == 10 - end - - test "pending and done work items", context do - connect(context, max_work_list_size: 10) - pause_mqtt_server(context) - - work_list = get_session_work_list() - - work_list = - Enum.reduce(1..5, work_list, fn i, acc -> - WorkList.push( - acc, - {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, - [expiration: Expiration.expiration(:infinity)]} - ) - end) - - assert WorkList.count(work_list) == 5 - - ref = make_ref() - - {work_list, _item} = - work_list - |> WorkList.pending(ref) - |> WorkList.done(ref) - - assert WorkList.count(work_list) == 4 - end - - test "dropping pending work items", context do - connect(context, max_work_list_size: 10) - pause_mqtt_server(context) - - work_list = get_session_work_list() - - work_list = - Enum.reduce(1..15, work_list, fn i, acc -> - WorkList.push( - acc, - {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, - [expiration: Expiration.expiration(:infinity)]} - ) - |> WorkList.pending(make_ref()) - end) - - assert WorkList.count_pending(work_list) == 10 - end - - test "reset_pending work items", context do - connect(context, max_work_list_size: 10) - pause_mqtt_server(context) - - work_list = get_session_work_list() - - work_list = - Enum.reduce(1..5, work_list, fn i, acc -> - WorkList.push( - acc, - {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, - [expiration: Expiration.expiration(:infinity)]} - ) - end) - - ref = make_ref() - - work_list = WorkList.pending(work_list, ref) - assert WorkList.count(work_list) == 4 - work_list = WorkList.reset_pending(work_list) - assert WorkList.count(work_list) == 5 + for work_list_mod <- [Jackalope.PersistentWorkList, Jackalope.TransientWorkList] do + test "#{work_list_mod}: dropping work orders", context do + connect(context, work_list_mod: unquote(work_list_mod), max_work_list_size: 10) + pause_mqtt_server(context) + + work_list = get_session_work_list() + + work_list = + Enum.reduce(1..15, work_list, fn i, acc -> + WorkList.push( + acc, + {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, + [expiration: Expiration.expiration(:infinity)]} + ) + end) + + assert WorkList.count(work_list) == 10 + end + + test "#{work_list_mod}: pending and done work items", context do + connect(context, work_list_mod: unquote(work_list_mod), max_work_list_size: 10) + pause_mqtt_server(context) + + work_list = get_session_work_list() + + work_list = + Enum.reduce(1..5, work_list, fn i, acc -> + WorkList.push( + acc, + {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, + [expiration: Expiration.expiration(:infinity)]} + ) + end) + + assert WorkList.count(work_list) == 5 + + ref = make_ref() + + {work_list, _item} = + work_list + |> WorkList.pending(ref) + |> WorkList.done(ref) + + assert WorkList.count(work_list) == 4 + end + + test "#{work_list_mod}: dropping pending work items", context do + connect(context, work_list_mod: unquote(work_list_mod), max_work_list_size: 10) + pause_mqtt_server(context) + + work_list = get_session_work_list() + + work_list = + Enum.reduce(1..15, work_list, fn i, acc -> + WorkList.push( + acc, + {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, + [expiration: Expiration.expiration(:infinity)]} + ) + |> WorkList.pending(make_ref()) + end) + + assert WorkList.count_pending(work_list) == 10 + end + + test "#{work_list_mod}: reset_pending work items", context do + connect(context, work_list_mod: unquote(work_list_mod), max_work_list_size: 10) + pause_mqtt_server(context) + + work_list = get_session_work_list() + + work_list = + Enum.reduce(1..5, work_list, fn i, acc -> + WorkList.push( + acc, + {{:publish, "foo", %{"msg" => "hello #{i}"}, [qos: 1]}, + [expiration: Expiration.expiration(:infinity)]} + ) + end) + + ref = make_ref() + + work_list = WorkList.pending(work_list, ref) + assert WorkList.count(work_list) == 4 + work_list = WorkList.reset_pending(work_list) + assert WorkList.count(work_list) == 5 + end end test "rebasing expiration" do @@ -205,6 +205,7 @@ defmodule JackalopeTest do handler = Keyword.get(opts, :handler, JackalopeTest.TestHandler) initial_topics = Keyword.get(opts, :initial_topics) max_work_list_size = Keyword.get(opts, :max_work_list_size, 100) + work_list_mod = Keyword.fetch!(opts, :work_list_mod) start_supervised!( {Jackalope, @@ -214,7 +215,7 @@ defmodule JackalopeTest do handler: handler, initial_topics: initial_topics, max_work_list_size: max_work_list_size, - work_list_mod: @work_list_mod, + work_list_mod: work_list_mod, data_dir: "/tmp" ]} ) From e69114b4363bccc3054013d6cb7cd4ff392d5741 Mon Sep 17 00:00:00 2001 From: Frank Hunleth Date: Thu, 20 Jan 2022 14:10:08 -0500 Subject: [PATCH 4/9] Fix spelling --- lib/jackalope/persistent_work_list.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index 145e5ff..1b5e47a 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -203,7 +203,7 @@ defmodule Jackalope.PersistentWorkList do Enum.each( 1..queue_size(state), fn _i -> - # remove from begining + # remove from beginning {:ok, item} = CubQ.dequeue(state.queue) if Expiration.unexpired?(item, state.expiration_fn) do @@ -218,7 +218,7 @@ defmodule Jackalope.PersistentWorkList do ) end - # Trim pending as needed to accomodate an additional pending item + # Trim pending as needed to accommodate an additional pending item defp bound_pending_items(pending, state) do if map_size(pending) > state.max_work_list_size do # Trim expired pending requests From 0afdaaf900847456630332d34e915bee73d14b18 Mon Sep 17 00:00:00 2001 From: Frank Hunleth Date: Thu, 20 Jan 2022 14:11:28 -0500 Subject: [PATCH 5/9] Remove parameterizable queue_name --- lib/jackalope/persistent_work_list.ex | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index 1b5e47a..5e0550d 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -16,7 +16,6 @@ defmodule Jackalope.PersistentWorkList do @moduledoc false defstruct db: nil, queue: nil, - queue_name: nil, max_work_list_size: nil, expiration_fn: nil, update_expiration_fn: nil @@ -41,7 +40,6 @@ defmodule Jackalope.PersistentWorkList do def init(args) do opts = Keyword.fetch!(args, :opts) data_dir = Keyword.get(opts, :data_dir, "/data/jackalope") - queue_name = Keyword.get(opts, :queue_name, :items) db = case CubDB.start_link(data_dir: data_dir, name: :work_list, auto_compact: true) do @@ -60,7 +58,7 @@ defmodule Jackalope.PersistentWorkList do CubDB.set_auto_file_sync(db, true) queue = - case CubQ.start_link(db: db, queue: queue_name) do + case CubQ.start_link(db: db, queue: :items) do {:ok, pid} -> pid @@ -79,7 +77,6 @@ defmodule Jackalope.PersistentWorkList do %State{ db: db, queue: queue, - queue_name: queue_name, max_work_list_size: Keyword.fetch!(args, :max_size), expiration_fn: Keyword.fetch!(args, :expiration_fn), update_expiration_fn: Keyword.fetch!(args, :update_expiration_fn) From ecb7896ff26e7156aee5bfcb8e7c4b7196188ec5 Mon Sep 17 00:00:00 2001 From: Jean-Francois Cloutier Date: Thu, 20 Jan 2022 14:26:18 -0500 Subject: [PATCH 6/9] Remove DB file on CubDB.start_link error and retry --- lib/jackalope/persistent_work_list.ex | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index 5e0550d..9ec6028 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -41,18 +41,17 @@ defmodule Jackalope.PersistentWorkList do opts = Keyword.fetch!(args, :opts) data_dir = Keyword.get(opts, :data_dir, "/data/jackalope") - db = - case CubDB.start_link(data_dir: data_dir, name: :work_list, auto_compact: true) do - {:ok, pid} -> - pid + cubdb_opts = [data_dir: data_dir, name: :work_list, auto_compact: true] - {:error, {:already_started, pid}} -> - pid - - other -> - Logger.warn("[Jackalope] Corrupted DB : #{inspect(other)}. Erasing it.") + {:ok, db} = + case CubDB.start_link(cubdb_opts) do + {:error, reason} -> + Logger.warn("[Jackalope] Corrupted DB : #{inspect(reason)}. Erasing it.") _ = File.rmdir(data_dir) - raise "Corrupted work list DB" + CubDB.start_link(cubdb_opts) + + success -> + success end CubDB.set_auto_file_sync(db, true) From 28fdf523db9e256935912e0b86ee63995a3e4e96 Mon Sep 17 00:00:00 2001 From: Jean-Francois Cloutier Date: Thu, 20 Jan 2022 14:45:35 -0500 Subject: [PATCH 7/9] Helper function to start the CubQ queue --- lib/jackalope/persistent_work_list.ex | 67 ++++++++++++++------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index 9ec6028..3f98d8d 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -39,37 +39,7 @@ defmodule Jackalope.PersistentWorkList do @impl GenServer def init(args) do opts = Keyword.fetch!(args, :opts) - data_dir = Keyword.get(opts, :data_dir, "/data/jackalope") - - cubdb_opts = [data_dir: data_dir, name: :work_list, auto_compact: true] - - {:ok, db} = - case CubDB.start_link(cubdb_opts) do - {:error, reason} -> - Logger.warn("[Jackalope] Corrupted DB : #{inspect(reason)}. Erasing it.") - _ = File.rmdir(data_dir) - CubDB.start_link(cubdb_opts) - - success -> - success - end - - CubDB.set_auto_file_sync(db, true) - - queue = - case CubQ.start_link(db: db, queue: :items) do - {:ok, pid} -> - pid - - {:error, {:already_started, pid}} -> - pid - - other -> - Logger.warn("[Jackalope] Corrupted queue : #{inspect(other)}. Erasing DB.") - _ = File.rmdir(data_dir) - raise "Corrupted work list queue" - end - + {db, queue} = start_queue(opts) send(self(), :tick) {:ok, @@ -161,6 +131,41 @@ defmodule Jackalope.PersistentWorkList do record_time_now(state) end + defp start_queue(opts) do + data_dir = Keyword.get(opts, :data_dir, "/data/jackalope") + + cubdb_opts = [data_dir: data_dir, name: :work_list, auto_compact: true] + + {:ok, db} = + case CubDB.start_link(cubdb_opts) do + {:error, reason} -> + Logger.warn("[Jackalope] Corrupted DB : #{inspect(reason)}. Erasing it.") + _ = File.rmdir(data_dir) + CubDB.start_link(cubdb_opts) + + success -> + success + end + + CubDB.set_auto_file_sync(db, true) + + queue = + case CubQ.start_link(db: db, queue: :items) do + {:ok, pid} -> + pid + + {:error, {:already_started, pid}} -> + pid + + other -> + Logger.warn("[Jackalope] Corrupted queue : #{inspect(other)}. Erasing DB.") + _ = File.rmdir(data_dir) + raise "Corrupted work list queue" + end + + {db, queue} + end + defp add_pending(state, item, ref) do updated_pending = get_pending(state) |> Map.put(ref, item) |> bound_pending_items(state) From dc00501ad3b4d7ba0a0f054d601223753a0d91f5 Mon Sep 17 00:00:00 2001 From: Jean-Francois Cloutier Date: Thu, 20 Jan 2022 14:59:49 -0500 Subject: [PATCH 8/9] Combine all work list opts --- lib/jackalope/persistent_work_list.ex | 22 +++++++--------------- lib/jackalope/session.ex | 12 +++++++----- lib/jackalope/transient_work_list.ex | 10 ++++++---- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index 3f98d8d..ad749e6 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -22,23 +22,15 @@ defmodule Jackalope.PersistentWorkList do end @doc "Create a new work list" - @spec new(function(), function(), non_neg_integer(), Keyword.t()) :: pid() - def new(expiration_fn, update_expiration_fn, max_size \\ @default_max_size, opts \\ []) do - args = [ - expiration_fn: expiration_fn, - update_expiration_fn: update_expiration_fn, - max_size: max_size, - opts: opts - ] - + @spec new(Keyword.t()) :: pid() + def new(opts) do Logger.info("[Jackalope] Starting #{__MODULE__} with #{inspect(opts)}") - {:ok, pid} = GenServer.start_link(__MODULE__, args) + {:ok, pid} = GenServer.start_link(__MODULE__, opts) pid end @impl GenServer - def init(args) do - opts = Keyword.fetch!(args, :opts) + def init(opts) do {db, queue} = start_queue(opts) send(self(), :tick) @@ -46,9 +38,9 @@ defmodule Jackalope.PersistentWorkList do %State{ db: db, queue: queue, - max_work_list_size: Keyword.fetch!(args, :max_size), - expiration_fn: Keyword.fetch!(args, :expiration_fn), - update_expiration_fn: Keyword.fetch!(args, :update_expiration_fn) + max_work_list_size: Keyword.get(opts, :max_size, @default_max_size), + expiration_fn: Keyword.fetch!(opts, :expiration_fn), + update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn) }, {:continue, :recover}} end diff --git a/lib/jackalope/session.ex b/lib/jackalope/session.ex index 30b97fd..80731c7 100644 --- a/lib/jackalope/session.ex +++ b/lib/jackalope/session.ex @@ -77,12 +77,14 @@ defmodule Jackalope.Session do work_list_mod = Keyword.fetch!(opts, :work_list_mod) work_list = - work_list_mod.new( - fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, - fn {cmd, opts}, expiration -> {cmd, Keyword.put(opts, :expiration, expiration)} end, - max_work_list_size, - opts + Keyword.merge(opts, + expiration_fn: fn {_cmd, opts} -> Keyword.fetch!(opts, :expiration) end, + update_expiration_fn: fn {cmd, opts}, expiration -> + {cmd, Keyword.put(opts, :expiration, expiration)} + end, + max_size: max_work_list_size ) + |> work_list_mod.new() initial_state = %State{ work_list: work_list, diff --git a/lib/jackalope/transient_work_list.ex b/lib/jackalope/transient_work_list.ex index 25a88aa..9638ad3 100644 --- a/lib/jackalope/transient_work_list.ex +++ b/lib/jackalope/transient_work_list.ex @@ -16,10 +16,12 @@ defmodule Jackalope.TransientWorkList do @type t() :: %__MODULE__{items: list(), max_size: non_neg_integer()} @doc "Create a new work list" - @spec new(function(), function(), non_neg_integer(), Keyword.t()) :: t() - def new(expiration_fn, _update_expiration_fn, max_size \\ @default_max_size, _opts \\ []) - when max_size > 0 do - %__MODULE__{max_size: max_size, expiration_fn: expiration_fn} + @spec new(Keyword.t()) :: t() + def new(opts) do + %__MODULE__{ + max_size: Keyword.get(opts, :max_size, @default_max_size), + expiration_fn: Keyword.fetch!(opts, :expiration_fn) + } end @doc false From 4d1341bc3b1042686fbb8dc5eeaf365071da2c9b Mon Sep 17 00:00:00 2001 From: Jean-Francois Cloutier Date: Thu, 20 Jan 2022 15:31:49 -0500 Subject: [PATCH 9/9] max_size --- lib/jackalope/persistent_work_list.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/jackalope/persistent_work_list.ex b/lib/jackalope/persistent_work_list.ex index ad749e6..a67d3c6 100644 --- a/lib/jackalope/persistent_work_list.ex +++ b/lib/jackalope/persistent_work_list.ex @@ -16,7 +16,7 @@ defmodule Jackalope.PersistentWorkList do @moduledoc false defstruct db: nil, queue: nil, - max_work_list_size: nil, + max_size: nil, expiration_fn: nil, update_expiration_fn: nil end @@ -38,7 +38,7 @@ defmodule Jackalope.PersistentWorkList do %State{ db: db, queue: queue, - max_work_list_size: Keyword.get(opts, :max_size, @default_max_size), + max_size: Keyword.get(opts, :max_size, @default_max_size), expiration_fn: Keyword.fetch!(opts, :expiration_fn), update_expiration_fn: Keyword.fetch!(opts, :update_expiration_fn) }, {:continue, :recover}} @@ -167,7 +167,7 @@ defmodule Jackalope.PersistentWorkList do defp pending_items(state), do: get_pending(state) |> Map.values() defp bound_work_items(state) do - max = state.max_work_list_size + max = state.max_size if queue_size(state) > max do :ok = remove_expired_work_items(state) @@ -213,7 +213,7 @@ defmodule Jackalope.PersistentWorkList do # Trim pending as needed to accommodate an additional pending item defp bound_pending_items(pending, state) do - if map_size(pending) > state.max_work_list_size do + if map_size(pending) > state.max_size do # Trim expired pending requests kept_pairs = Enum.reduce( @@ -229,7 +229,7 @@ defmodule Jackalope.PersistentWorkList do ) # If still over maximum, remove the oldest pending request (expiration is smallest) - if length(kept_pairs) > state.max_work_list_size do + if length(kept_pairs) > state.max_size do [{ref, item} | newer_pairs] = Enum.sort(kept_pairs, fn {_, item1}, {_, item2} -> Expiration.after?(state.expiration_fn.(item2), state.expiration_fn.(item1))