diff --git a/lib/etcd_ex/lease.ex b/lib/etcd_ex/lease.ex new file mode 100644 index 0000000..f105482 --- /dev/null +++ b/lib/etcd_ex/lease.ex @@ -0,0 +1,126 @@ +defmodule EtcdEx.Lease do + @moduledoc """ + This module provides mechanism to set up Etcd leases with limited TTLs + and keeps them alive. + """ + + use GenServer + + require Logger + + defstruct [:conn, :lease_id, :ttl, :keep_alive] + + @retry_delay :timer.seconds(5) + + @doc """ + Gets the lease maintained by the process. + """ + @spec get(pid) :: {:ok, any} | :error + def get(pid) do + lease_id = GenServer.call(pid, :get) + {:ok, lease_id} + catch + :exit, {:noproc, _} -> + :error + end + + @doc """ + Revokes the lease in Etcd and stops the process. + """ + @spec delete(pid) :: :ok + def delete(pid) do + GenServer.call(pid, :delete) + catch + :exit, {:noproc, _} -> + :ok + end + + @doc """ + Starts process that obtains and maintains a lease in Etcd + + Expects `ttl` and `keep_alive` arguments to be in milliseconds. + """ + @spec start_link({term, pos_integer, pos_integer}) :: GenServer.on_start() + def start_link({conn, ttl, keep_alive}) do + GenServer.start_link(__MODULE__, {conn, ttl, keep_alive}) + end + + @impl true + def init({conn, ttl, keep_alive}) do + state = %__MODULE__{ + conn: conn, + ttl: ttl, + keep_alive: keep_alive + } + + {:ok, state, {:continue, :create}} + end + + @impl true + def handle_continue(:create, %__MODULE__{conn: conn, ttl: ttl} = state) do + ttl = :erlang.convert_time_unit(ttl, :millisecond, :second) + + case EtcdEx.grant(conn, ttl) do + {:ok, %{ID: lease_id}} -> + state = + state + |> Map.put(:lease_id, lease_id) + |> schedule_keep_alive() + + {:noreply, state} + + {:error, _reason} -> + {:stop, :shutdown, state} + end + end + + @impl true + def handle_call(:get, _, %__MODULE__{lease_id: lease_id} = state) do + {:reply, lease_id, state} + end + + def handle_call(:delete, _, %__MODULE__{conn: conn, lease_id: lease_id} = state) do + case EtcdEx.revoke(conn, lease_id) do + {:ok, _} -> + {:stop, :shutdown, :ok, state} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + + @impl true + def handle_info(:keep_alive, %__MODULE__{conn: conn, lease_id: lease_id} = state) do + case keep_alive(conn, lease_id) do + {:ok, _} -> + state = schedule_keep_alive(state) + {:noreply, state} + + {:error, :not_found} -> + {:stop, :shutdown, state} + + {:error, _reason} -> + # extending the ttl failed but the lease is still alive, so keep running + state = schedule_keep_alive(state) + {:noreply, state} + end + end + + defp keep_alive(conn, lease_id) do + with {:ok, %{TTL: ttl}} when ttl > 0 <- EtcdEx.ttl(conn, lease_id, keys: false), + {:ok, _} <- EtcdEx.keep_alive(conn, lease_id) do + {:ok, lease_id} + else + {:error, reason} -> {:error, reason} + _ -> {:error, :not_found} + end + end + + defp schedule_keep_alive(state) do + Process.send_after(self(), :keep_alive, next_tick(state)) + state + end + + defp next_tick(%__MODULE__{lease_id: nil}), do: @retry_delay + defp next_tick(%__MODULE__{keep_alive: keep_alive}), do: keep_alive +end diff --git a/lib/etcd_ex/lease/mock.ex b/lib/etcd_ex/lease/mock.ex new file mode 100644 index 0000000..ea7d6b9 --- /dev/null +++ b/lib/etcd_ex/lease/mock.ex @@ -0,0 +1,9 @@ +defmodule EtcdEx.Lease.Mock do + @moduledoc false + use GenServer + + def start_link({_, _, _}), do: GenServer.start_link(__MODULE__, nil) + def get(_), do: {:ok, 666} + def delete(pid), do: GenServer.stop(pid) + def init(_), do: {:ok, nil} +end diff --git a/lib/etcd_ex/repo.ex b/lib/etcd_ex/repo.ex new file mode 100644 index 0000000..645e50c --- /dev/null +++ b/lib/etcd_ex/repo.ex @@ -0,0 +1,326 @@ +defmodule EtcdEx.Repo do + @moduledoc """ + Provides a repository for reading and writing data to etcd. + """ + + use Supervisor + + @impl true + def init(opts) do + {conn_opts, cache_opts} = Keyword.split(opts, [:endpoint]) + module = cache_opts[:module] + conn_opts = Keyword.put(conn_opts, :name, module.conn_name()) + + children = [ + {EtcdEx, conn_opts}, + {__MODULE__.Cache, cache_opts} + ] + + Supervisor.init(children, strategy: :one_for_one) + end + + @doc false + defmacro __using__(opts) do + {otp_app, prefix, pubsub, static_opts} = compile_opts(opts) + + # credo:disable-for-next-line + quote location: :keep do + @behaviour unquote(__MODULE__) + + @default_endpoint {:http, "localhost", 2379, []} + @default_bootstrap_timeout 5_000 + @default_bootstrap_backoff 5_000 + + @prefix unquote(prefix) + @pubsub unquote(pubsub) + + @doc false + def child_spec(opts) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [opts]} + } + end + + @doc """ + Starts repo. + """ + def start_link(runtime_opts) do + default_opts = [ + module: __MODULE__, + endpoint: @default_endpoint, + bootstrap_timeout: @default_bootstrap_timeout, + bootstrap_backoff: @default_bootstrap_backoff + ] + + static_opts = + unquote(static_opts) + |> Keyword.take([:bootstrap_timeout, :bootstrap_backoff]) + |> Keyword.merge( + prefix: @prefix, + pubsub: @pubsub + ) + + application_opts = + if unquote(otp_app) do + unquote(otp_app) + |> Application.fetch_env!(__MODULE__) + |> Keyword.take([:endpoint, :bootstrap_timeout, :bootstrap_backoff]) + else + [] + end + + runtime_opts = + runtime_opts + |> Keyword.take([:endpoint, :bootstrap_timeout, :bootstrap_backoff]) + + opts = + default_opts + |> Keyword.merge(static_opts) + |> Keyword.merge(application_opts) + |> Keyword.merge(runtime_opts) + + Supervisor.start_link(unquote(__MODULE__), opts, name: __MODULE__) + end + + @doc false + def conn_name, do: Module.concat(__MODULE__, Connection) + + @doc false + def cache_name, do: Module.concat(__MODULE__, Cache) + + @doc """ + Returns connection process. + """ + def conn do + Process.whereis(conn_name()) + end + + @doc """ + Returns etcd key prefix. + """ + @spec prefix :: String.t() + def prefix, do: @prefix + + @doc """ + Select cached data with ets match spec. + """ + @spec select(:ets.match_spec()) :: list(term) + def select(match_spec \\ [{:_, [], [:"$_"]}]) do + :ets.select(cache_name(), match_spec) + end + + @doc """ + Retrieves cached value. + """ + @spec get(term) :: term + def get(key) do + case :ets.lookup(cache_name(), key) do + [] -> nil + [{_key, value}] -> value + end + end + + @doc """ + Puts key/value pair into etcd. + """ + @spec put(term, term, keyword) :: {:ok, term} | {:error, term} + def put(key, value, opts \\ []) do + GenServer.call(cache_name(), {:put, key, value, opts}) + end + + @doc """ + Deletes key/value pair from etcd. + """ + @spec delete(term, keyword) :: {:ok, term} | {:error, term} + def delete(key, opts \\ []) do + GenServer.call(cache_name(), {:delete, key, opts}) + end + + if @pubsub do + @doc """ + Subscribes to updates for all keys. + + If the caller also subscribes to updates for specific key using + `subscribe/1`, it will receive duplicate messages for that key. + """ + @spec subscribe :: :ok | {:error, term} + def subscribe do + Phoenix.PubSub.subscribe(@pubsub, unquote(__MODULE__).pubsub_topic(__MODULE__)) + end + + @doc """ + Subscribes to updates for single key. + """ + @spec subscribe(term) :: :ok | {:error, term} + def subscribe(key) do + Phoenix.PubSub.subscribe( + @pubsub, + unquote(__MODULE__).pubsub_topic(key, __MODULE__, @prefix) + ) + end + + @doc """ + Unsubscribes from updates for all keys. + + If the caller also subscribed to updates for specific key using + `subscribe/1`, it will need to unsubscribe using `unsubscribe/1` + in order to stop receiving updates for that key. + """ + @spec unsubscribe :: :ok + def unsubscribe do + Phoenix.PubSub.unsubscribe(@pubsub, unquote(__MODULE__).pubsub_topic(__MODULE__)) + end + + @doc """ + Unsubscribes from updates for single key. + """ + @spec unsubscribe(term) :: :ok + def unsubscribe(key) do + Phoenix.PubSub.unsubscribe( + @pubsub, + unquote(__MODULE__).pubsub_topic(key, __MODULE__, @prefix) + ) + end + end + + @impl true + def decode_key(key, prefix) do + key = String.replace_leading(key, prefix, "") + {:ok, key} + end + + @impl true + def encode_key(key, prefix) when is_binary(key) do + prefix <> key + end + + @impl true + def decode_value(_key, value) do + Jason.decode(value) + end + + @impl true + def encode_value(_key, value) do + Jason.encode!(value) + end + + @impl true + def bootstrap_from_static_data do + {:ok, []} + end + + defoverridable unquote(__MODULE__) + end + end + + @doc """ + Converts etcd key to a more convenient term for local use. + + This term is used to reference the object when retrieving from cache, + writing into etcd and broadcasting changes. + + If decoding fails (function returns `:error` tuple), key/value pair is + ignored. This is to prevent situations when a badly formatted key read from + etcd unexpectedly crashes the process. + + If the tuple is `{:error, :ignore}` the key/value pair is quietly ignored. + If the tuple is `{:error, reason}` the key/value pair is ignored and the reason is logged. + + Default implementation simply removes the prefix from the key. + """ + @callback decode_key(key :: String.t(), prefix :: String.t()) :: {:ok, term} | {:error, term} + + @doc """ + Converts term back to etcd key. + + It is required that this function does the exact reverse of `decode_key` + in order for change broadcasting to work. + + Default implementation simply adds the prefix back. + """ + @callback encode_key(key :: term, prefix :: String.t()) :: String.t() + + @doc """ + Decodes value after reading from etcd. + + If decoding fails (function returns `:error` tuple), key/value pair is + ignored. This is to prevent situations when a badly formatted value read from + etcd unexpectedly crashes the process. + + Default implementation uses Jason.decode/1. + """ + @callback decode_value(key :: term, value :: term) :: {:ok, term} | {:error, term} + + @doc """ + Encodes value before writing into etcd. + + Default implementation uses Jason.encode!/1 + """ + @callback encode_value(key :: term, value :: term) :: term + + @doc """ + Bootstraps from static data. + + When bootstrap from etcd fails, we can fall back to static data to populate the cache + and continue to serve read requests. In the meantime we will keep trying to bootstrap + from etcd in the background. The repo will remain in read-only mode until the bootstrap + is successful. + + This function should return an :ok tuple with the list of %{key: key, value: value} maps + to be inserted into the cache, or an :error. + """ + @callback bootstrap_from_static_data() :: {:ok, [%{key: term, value: term}]} | :error + + defp compile_opts(opts) do + otp_app = + case Keyword.fetch(opts, :otp_app) do + {:ok, otp_app} when is_atom(otp_app) -> + otp_app + + {:ok, other} -> + raise ArgumentError, "expected :otp_app to be atom, got: #{inspect(other)}" + + :error -> + nil + end + + prefix = + case Keyword.fetch(opts, :prefix) do + {:ok, prefix} when is_binary(prefix) -> + prefix + + {:ok, other} -> + raise ArgumentError, "expected :prefix to be string, got: #{inspect(other)}" + + :error -> + raise ArgumentError, "expected :prefix to be present" + end + + pubsub = + with {:ok, pubsub} <- Keyword.fetch(opts, :pubsub), + {:module, _} <- Code.ensure_loaded(Phoenix.PubSub) do + pubsub + else + {:error, _reason} -> + raise ArgumentError, "using :pubsub requires phoenix_pubsub dependency" + + :error -> + nil + end + + opts = Keyword.drop(opts, [:otp_app, :prefix, :pubsub]) + + {otp_app, prefix, pubsub, opts} + end + + @doc false + def pubsub_topic(module) do + "#{module}" + end + + @doc false + def pubsub_topic(key, module, prefix) do + "#{module}:#{module.encode_key(key, prefix)}" + end +end diff --git a/lib/etcd_ex/repo/cache.ex b/lib/etcd_ex/repo/cache.ex new file mode 100644 index 0000000..de1ab6b --- /dev/null +++ b/lib/etcd_ex/repo/cache.ex @@ -0,0 +1,276 @@ +defmodule EtcdEx.Repo.Cache do + @moduledoc false + + use GenServer + require Logger + + alias EtcdEx.Repo + + defstruct [ + :module, + :conn, + :prefix, + :pubsub, + :bootstrap_timeout, + :bootstrap_backoff, + :watch, + :bootstrapped_from_static_data? + ] + + def start_link(opts) do + GenServer.start_link( + __MODULE__, + opts, + name: opts[:module].cache_name() + ) + end + + @impl true + def init(opts) do + :ets.new( + opts[:module].cache_name(), + [:protected, :named_table, read_concurrency: true] + ) + + state = %__MODULE__{ + conn: opts[:module].conn_name(), + module: opts[:module], + prefix: opts[:prefix], + pubsub: opts[:pubsub], + bootstrap_timeout: opts[:bootstrap_timeout], + bootstrap_backoff: opts[:bootstrap_backoff], + bootstrapped_from_static_data?: false + } + + {:ok, state, {:continue, :bootstrap}} + end + + @impl true + def handle_continue( + :bootstrap, + %__MODULE__{module: module, prefix: prefix, pubsub: pubsub} = state + ) do + broadcast_event(:bootstrap_start, module, prefix, pubsub) + {:noreply, do_bootstrap(state)} + end + + @impl true + def handle_call( + {:put, key, value, opts}, + _from, + %__MODULE__{module: module, conn: conn, prefix: prefix} = state + ) do + key = module.encode_key(key, prefix) + value = module.encode_value(key, value) + + {:reply, EtcdEx.put(conn, key, value, opts), state} + end + + def handle_call( + {:delete, key, opts}, + _from, + %__MODULE__{module: module, conn: conn, prefix: prefix} = state + ) do + key = module.encode_key(key, prefix) + + {:reply, EtcdEx.delete(conn, key, opts), state} + end + + @impl true + def handle_info({:etcd_watch_created, _ref}, state) do + {:noreply, state} + end + + def handle_info( + {:etcd_watch_notify, ref, %{events: events, header: %{revision: revision}}}, + %__MODULE__{ + module: module, + prefix: prefix, + pubsub: pubsub, + watch: %{ref: ref} = watch + } = state + ) do + for event <- events do + process_event(event, module, prefix, pubsub) + end + + {:noreply, %{state | watch: Map.put(watch, :revision, revision)}} + end + + def handle_info( + {:etcd_watch_canceled, ref, reason}, + %__MODULE__{module: module, watch: %{ref: ref}} = state + ) do + Logger.error("#{module} etcd watch canceled #{inspect(reason)}") + {:noreply, %{state | watch: nil}, {:continue, :bootstrap}} + end + + def handle_info( + {:etcd_watch_error, reason}, + %__MODULE__{module: module} = state + ) do + Logger.error("#{module} etcd watch error #{reason}") + {:noreply, %{state | watch: nil}, {:continue, :bootstrap}} + end + + defp do_bootstrap(%__MODULE__{module: module, bootstrap_backoff: backoff} = state) do + case bootstrap_from_etcd(state) do + {:ok, state} -> + state + + _ -> + state = bootstrap_from_static_data(state) + Logger.error("#{module} bootstrap from etcd failed, retrying in #{inspect(backoff)}ms") + + :timer.sleep(backoff) + do_bootstrap(state) + end + end + + defp bootstrap_from_etcd( + %__MODULE__{ + module: module, + conn: conn, + prefix: prefix, + pubsub: pubsub, + bootstrap_timeout: timeout + } = state + ) do + ## for testing purposes, we use __MODULE__.get_kvs/3 here instead of a direct call + with {:ok, {revision, kvs}} <- get_kvs(conn, prefix, timeout), + process_kvs(module, prefix, pubsub, kvs), + {:ok, watch_ref} <- watch_prefix(conn, prefix, revision) do + broadcast_event(:bootstrap_stop, module, prefix, pubsub) + Logger.info("#{module} bootstrap from etcd successful") + + state = %{ + state + | watch: %{ref: watch_ref, revision: revision}, + bootstrapped_from_static_data?: false + } + + {:ok, state} + else + _ -> :error + end + catch + _, _ -> :error + end + + defp bootstrap_from_static_data( + %__MODULE__{ + module: module, + prefix: prefix, + pubsub: pubsub, + bootstrapped_from_static_data?: false + } = state + ) do + case module.bootstrap_from_static_data() do + {:ok, kvs} -> + process_kvs(module, prefix, pubsub, kvs) + broadcast_event(:bootstrap_stop, module, prefix, pubsub) + Logger.info("#{module} bootstrap from static data successful") + + %{state | bootstrapped_from_static_data?: true} + + _ -> + state + end + end + + defp bootstrap_from_static_data(%__MODULE__{bootstrapped_from_static_data?: true} = state), + do: state + + defp get_kvs(conn, prefix, timeout) do + case EtcdEx.get(conn, prefix, [prefix: true], timeout) do + {:ok, %{header: %{revision: revision}, kvs: kvs}} -> + {:ok, {revision, kvs}} + + {:error, reason} -> + Logger.error("etcd get #{prefix} failed: #{inspect(reason)}") + :error + end + end + + defp watch_prefix(conn, prefix, revision) do + case EtcdEx.watch(conn, self(), prefix, prefix: true, start_revision: revision) do + {:ok, ref} -> + {:ok, ref} + + {:error, reason} -> + Logger.error("etcd watch #{prefix} failed: #{inspect(reason)}") + :error + end + end + + defp process_kvs(module, prefix, pubsub, kvs) do + :ets.delete_all_objects(module.cache_name()) + + for kv <- kvs do + process_event(%{type: :PUT, kv: kv}, module, prefix, pubsub) + end + end + + defp process_event(%{type: :PUT, kv: %{key: key, value: value}}, module, prefix, pubsub) do + with {:key, {:ok, key}} <- {:key, module.decode_key(key, prefix)}, + {:value, {:ok, value}} <- {:value, module.decode_value(key, value)} do + :ets.insert(module.cache_name(), {key, value}) + broadcast_event({:PUT, key, value}, module, prefix, pubsub) + else + {:key, {:error, :ignore}} -> + :ok + + {:key, {:error, reason}} -> + Logger.error("#{module} failed to decode key #{key} (#{reason})") + + {:value, _} -> + Logger.error("#{module} failed to decode value for key #{key}") + end + end + + defp process_event(%{type: :DELETE, kv: %{key: key}}, module, prefix, pubsub) do + case module.decode_key(key, prefix) do + {:ok, key} -> + :ets.delete(module.cache_name(), key) + broadcast_event({:DELETE, key}, module, prefix, pubsub) + + {:error, :ignore} -> + :ok + + {:error, reason} -> + Logger.error("#{module} failed to decode key #{key} (#{reason})") + end + end + + defp broadcast_event(_event, _module, _prefix, _pubsub = nil), do: :ok + + defp broadcast_event(:bootstrap_start, module, _prefix, pubsub) do + maybe_broadcast(pubsub, Repo.pubsub_topic(module), {:bootstrap_start, module}) + end + + defp broadcast_event(:bootstrap_stop, module, _prefix, pubsub) do + maybe_broadcast(pubsub, Repo.pubsub_topic(module), {:bootstrap_stop, module}) + end + + defp broadcast_event(event, module, prefix, pubsub) do + key = + case event do + {:PUT, key, _value} -> key + {:DELETE, key} -> key + end + + maybe_broadcast(pubsub, Repo.pubsub_topic(module), event) + maybe_broadcast(pubsub, Repo.pubsub_topic(key, module, prefix), event) + end + + defp maybe_broadcast(pubsub, topic, event) do + case Code.ensure_compiled(Phoenix.PubSub) do + {:module, module} -> + module.broadcast(pubsub, topic, event) + + {:error, _} -> + Logger.error("Unable to broadcast event, Phoenix.PubSub not loaded.") + :noop + end + end +end diff --git a/lib/etcd_ex/repo/mock.ex b/lib/etcd_ex/repo/mock.ex new file mode 100644 index 0000000..dff6d46 --- /dev/null +++ b/lib/etcd_ex/repo/mock.ex @@ -0,0 +1,169 @@ +defmodule EtcdEx.Repo.Mock do + @moduledoc """ + Repository mock for testing. + + Use in your test module to setup a mock cache for your repository. + + defmodule MyTest do + use ExUnit.Case + use EtcdEx.Repo.Mock + + setup do + mock_repo(MyRepo, [ + %{key: "/foo", value: "1"}, + ]) + end + end + + Make sure your repository mock includes key-vaule pairs, where values are + JSON-encoded. + """ + + defmacro __using__(_) do + quote location: :keep do + def mock_repo(module, kvs) do + start_supervised(%{ + id: __MODULE__.RepoMocks, + start: {Agent, :start_link, [fn -> %{} end, [name: __MODULE__.RepoMocks]]} + }) + + Mimic.set_mimic_global() + + prefix = module.prefix() + test_pid = self() + ref = make_ref() + + Agent.update(__MODULE__.RepoMocks, fn mocks -> + Map.put(mocks, prefix, {module, nil, test_pid, ref, 1}) + end) + + # etcd watch + Mimic.stub( + EtcdEx, + :watch, + fn _, _, prefix, [{:prefix, true} | _] -> + {test_pid, ref} = + Agent.get(__MODULE__.RepoMocks, fn %{^prefix => {_, _, test_pid, ref, _}} -> + {test_pid, ref} + end) + + send(test_pid, {:etcd_watch, prefix, ref}) + {:ok, ref} + end + ) + + # bootstrap data + Mimic.stub( + EtcdEx, + :get, + fn _, prefix, [prefix: true], _timeout -> + module = + Agent.get(__MODULE__.RepoMocks, fn %{^prefix => {module, _, _, _, _}} -> + module + end) + + {:ok, + %{ + header: %{revision: next_etcd_revision(module)}, + kvs: List.flatten(kvs) + }} + end + ) + + # start cache process + {:ok, pid} = start_supervised(module) + + Agent.update(__MODULE__.RepoMocks, fn mocks -> + Map.update!(mocks, prefix, fn {module, _, test_pid, ref, revision} -> + {module, pid, test_pid, ref, revision} + end) + end) + + # allow time to bootstrap + :timer.sleep(50) + + # stub updates + Mimic.stub( + EtcdEx, + :put, + fn _, key, value, opts -> + {module, test_pid} = + Agent.get(__MODULE__.RepoMocks, fn mocks -> + Enum.find_value(mocks, fn + {prefix, {module, _, test_pid, _, _}} -> + if String.starts_with?(key, prefix) do + {module, test_pid} + end + + _ -> + false + end) + end) + + send(test_pid, {:etcd_put, key, value, opts}) + etcd_notify(module, [%{type: :PUT, kv: %{key: key, value: value}}]) + + {:ok, + %{header: %{revision: next_etcd_revision(module)}, kvs: [%{key: key, value: value}]}} + end + ) + + Mimic.stub( + EtcdEx, + :delete, + fn _, key, _ -> + {module, test_pid} = + Agent.get(__MODULE__.RepoMocks, fn mocks -> + Enum.find_value(mocks, fn + {prefix, {module, _, test_pid, _, _}} -> + if String.starts_with?(key, prefix) do + {module, test_pid} + end + + _ -> + false + end) + end) + + send(test_pid, {:etcd_delete, key}) + etcd_notify(module, [%{type: :DELETE, kv: %{key: key}}]) + {:ok, %{header: %{revision: next_etcd_revision(module)}, kvs: [%{key: key}]}} + end + ) + + :ok + end + + def etcd_notify(module, events) do + ref = + Agent.get(__MODULE__.RepoMocks, fn mocks -> + Enum.find_value(mocks, fn + {prefix, {^module, _, _, ref, _}} -> ref + _ -> false + end) + end) + + send( + module.cache_name(), + {:etcd_watch_notify, ref, + %{events: events, header: %{revision: next_etcd_revision(module)}}} + ) + + # allow time to process events + :timer.sleep(50) + end + + def next_etcd_revision(module) do + Agent.get_and_update(__MODULE__.RepoMocks, fn mocks -> + {prefix, {module, pid, test_pid, ref, revision}} = + Enum.find_value(mocks, fn + {prefix, {^module, _, _, _, _} = mock} -> {prefix, mock} + _ -> false + end) + + {revision, Map.put(mocks, prefix, {module, pid, test_pid, ref, revision + 1})} + end) + end + end + end +end diff --git a/mix.exs b/mix.exs index f721e20..d83ae33 100644 --- a/mix.exs +++ b/mix.exs @@ -41,13 +41,16 @@ defmodule EtcdEx.MixProject do {:mint, "~> 1.0"}, {:protobuf, "~> 0.12"}, {:connection, "~> 1.1"}, + {:jason, "~> 1.0"}, {:telemetry, "~> 0.4 or ~> 1.0"}, # Dev/test dependencies {:dialyxir, "~> 1.3", only: [:dev, :test], runtime: false}, {:stream_data, "~> 0.5", only: :test}, {:excoveralls, "~> 0.10", only: :test}, - {:ex_doc, ">= 0.0.0", only: :dev, runtime: false} + {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, + {:mimic, "~> 1.9.0", only: :test}, + {:phoenix_pubsub, "~> 2.0", only: :test} ] end diff --git a/mix.lock b/mix.lock index d04ac9a..d5b9fc9 100644 --- a/mix.lock +++ b/mix.lock @@ -15,9 +15,11 @@ "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mimic": {:hex, :mimic, "1.9.0", "c96367749a884556718f64657a4bdc99ce0cb5d19333aa04308fbd061c31b8b7", [:mix], [], "hexpm", "92107697938490b300566317c2a1490ef52e23aeac16632c0e56740721189116"}, "mint": {:hex, :mint, "1.4.2", "50330223429a6e1260b2ca5415f69b0ab086141bc76dc2fbf34d7c389a6675b2", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "ce75a5bbcc59b4d7d8d70f8b2fc284b1751ffb35c7b6a6302b5192f8ab4ddd80"}, "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, "protobuf": {:hex, :protobuf, "0.12.0", "58c0dfea5f929b96b5aa54ec02b7130688f09d2de5ddc521d696eec2a015b223", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "75fa6cbf262062073dd51be44dd0ab940500e18386a6c4e87d5819a58964dc45"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, "stream_data": {:hex, :stream_data, "0.5.0", "b27641e58941685c75b353577dc602c9d2c12292dd84babf506c2033cd97893e", [:mix], [], "hexpm", "012bd2eec069ada4db3411f9115ccafa38540a3c78c4c0349f151fc761b9e271"}, diff --git a/test/etcd_ex/repo_test.exs b/test/etcd_ex/repo_test.exs new file mode 100644 index 0000000..3c4b37e --- /dev/null +++ b/test/etcd_ex/repo_test.exs @@ -0,0 +1,238 @@ +defmodule EtcdEx.RepoTest do + use ExUnit.Case + use EtcdEx.Repo.Mock + + alias EtcdEx.Repo + + import ExUnit.CaptureLog + + defmodule Prefix do + use Repo, prefix: "/test/data" + end + + defmodule CustomKeys do + use Repo, prefix: "/test/data" + + def decode_key(key, prefix) do + key = String.replace_leading(key, prefix, "") + + case key do + "/ignored" <> _ -> {:error, :ignore} + "/bad" <> _ -> {:error, :badkey} + key -> {:ok, String.split(key, "/", trim: true)} + end + end + + def encode_key(key, prefix) do + Enum.join([prefix | key], "/") + end + + def decode_value(_key, value) do + case Jason.decode(value) do + {:ok, 666} -> {:error, :ignore} + {:ok, value} -> {:ok, value} + end + end + end + + defmodule PubSub do + use Repo, prefix: "/test/data", pubsub: :etcd_cache_pubsub + end + + describe "Cache with prefix" do + setup do + mock_repo(Prefix, [ + %{key: "/test/data/foo", value: "1"}, + %{key: "/test/data/bar", value: "2"} + ]) + end + + test "loads data into cache" do + assert Prefix.get("/foo") == 1 + assert Prefix.get("/bar") == 2 + end + + test "watches keys and updates cache" do + assert_received({:etcd_watch, "/test/data", _ref}) + + etcd_notify(Prefix, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "true"}}, + %{type: :DELETE, kv: %{key: "/test/data/bar"}}, + %{type: :PUT, kv: %{key: "/test/data/abc", value: "\"string\""}} + ]) + + assert Prefix.get("/foo") == true + assert Prefix.get("/bar") == nil + assert Prefix.get("/abc") == "string" + end + + test "ignores badly formatted values" do + log = + capture_log(fn -> + etcd_notify(Prefix, [ + %{type: :PUT, kv: %{key: "/test/data/ignored/foo", value: "bad"}} + ]) + end) + + assert Prefix.get(["ignored", "foo"]) == nil + + assert log =~ "failed to decode value for key /test/data/ignored/foo" + end + + test "puts data into etcd" do + Prefix.put("/xyz", %{foo: "bar"}) + + assert_received({:etcd_put, "/test/data/xyz", "{\"foo\":\"bar\"}", []}) + + Prefix.put("/xyz", %{foo: "baz"}, lease: 1234) + + assert_received({:etcd_put, "/test/data/xyz", "{\"foo\":\"baz\"}", [lease: 1234]}) + end + + test "deletes data from etcd" do + Prefix.delete("/foo") + + assert_received({:etcd_delete, "/test/data/foo"}) + end + end + + describe "Cache with custom keys" do + setup do + mock_repo(CustomKeys, [ + %{key: "/test/data/a/b", value: "1"}, + %{key: "/test/data/c/d", value: "2"} + ]) + end + + test "loads data into cache" do + assert CustomKeys.get(["a", "b"]) == 1 + assert CustomKeys.get(["c", "d"]) == 2 + end + + test "watches keys and updates cache" do + assert_received({:etcd_watch, "/test/data", _ref}) + + etcd_notify(CustomKeys, [ + %{type: :PUT, kv: %{key: "/test/data/a/b", value: "true"}}, + %{type: :DELETE, kv: %{key: "/test/data/c/d"}}, + %{type: :PUT, kv: %{key: "/test/data/foo", value: "\"string\""}} + ]) + + assert CustomKeys.get(["a", "b"]) == true + assert CustomKeys.get(["c", "d"]) == nil + assert CustomKeys.get(["foo"]) == "string" + end + + test "quietly ignores keys that are marked as ignored" do + log = + capture_log(fn -> + etcd_notify(CustomKeys, [ + %{type: :PUT, kv: %{key: "/test/data/ignored/foo", value: "3"}}, + %{type: :PUT, kv: %{key: "/test/data/ignored", value: "4"}} + ]) + end) + + assert CustomKeys.get(["ignored", "foo"]) == nil + assert CustomKeys.get(["ignored"]) == nil + + refute log =~ "failed to decode key /test/data/ignored/foo" + refute log =~ "failed to decode key /test/data/ignored" + end + + test "ignores and logs keys that fail to decode" do + log = + capture_log(fn -> + etcd_notify(CustomKeys, [ + %{type: :PUT, kv: %{key: "/test/data/bad/foo", value: "3"}}, + %{type: :PUT, kv: %{key: "/test/data/bad", value: "4"}} + ]) + end) + + assert CustomKeys.get(["bad", "foo"]) == nil + assert CustomKeys.get(["bad"]) == nil + + assert log =~ "failed to decode key /test/data/bad/foo" + assert log =~ "failed to decode key /test/data/bad" + end + + test "ignores badly formatted values" do + log = + capture_log(fn -> + etcd_notify(CustomKeys, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "666"}} + ]) + end) + + assert CustomKeys.get(["foo"]) == nil + + assert log =~ "failed to decode value for key /test/data/foo" + end + + test "puts data into etcd" do + CustomKeys.put(["x", "y", "z"], %{foo: "bar"}) + + assert_received({:etcd_put, "/test/data/x/y/z", "{\"foo\":\"bar\"}", []}) + + CustomKeys.put(["x", "y", "z"], %{foo: "baz"}, lease: 1234) + + assert_received({:etcd_put, "/test/data/x/y/z", "{\"foo\":\"baz\"}", [lease: 1234]}) + end + + test "deletes data from etcd" do + CustomKeys.delete(["a", "b"]) + + assert_received({:etcd_delete, "/test/data/a/b"}) + end + end + + describe "Cache with pubsub" do + setup do + start_supervised!({Phoenix.PubSub, name: :etcd_cache_pubsub}) + + mock_repo(PubSub, [ + %{key: "/test/data/foo", value: "1"}, + %{key: "/test/data/bar", value: "2"} + ]) + end + + test "broadcasts updates for all keys to subscribers" do + :ok = PubSub.subscribe() + + etcd_notify(PubSub, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "3"}}, + %{type: :DELETE, kv: %{key: "/test/data/bar"}} + ]) + + assert_received({:PUT, "/foo", 3}) + assert_received({:DELETE, "/bar"}) + + :ok = PubSub.unsubscribe() + + etcd_notify(PubSub, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "4"}} + ]) + + refute_received({:PUT, "/foo", _}) + end + + test "broadcasts updates for single key to subscribers" do + :ok = PubSub.subscribe("/foo") + + etcd_notify(PubSub, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "3"}}, + %{type: :DELETE, kv: %{key: "/test/data/bar"}} + ]) + + assert_received({:PUT, "/foo", 3}) + refute_received({:DELETE, "/bar"}) + + :ok = PubSub.unsubscribe("/foo") + + etcd_notify(PubSub, [ + %{type: :PUT, kv: %{key: "/test/data/foo", value: "4"}} + ]) + + refute_received({:PUT, "/foo", _}) + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 869559e..039df1a 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1 +1,4 @@ ExUnit.start() + +Mimic.copy(EtcdEx) +Mimic.copy(EtcdEx.Repo.Cache)