diff --git a/CHANGELOG.md b/CHANGELOG.md index 5554bdf..b0fc9e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # PhoenixClient +## Unreleased + +* Breaking changes + * Channel join is asynchronous, now it returns `{:ok, channel} | {:error, term}` + * Channel join doesn't have timeout anymore, now it receives an optional keyword list with options for controlling the backoff retries + * `Socket.channel_join/4` doesn't take the parameters anymore and now is `Socket.channel_join/3` now doesn't send the join message, now the Channel is responsible for that, so it doesn't need the params. + * `Socket.channel_join/3` and `Socket.channel_leave/3` returns `:ok` instead of `{:ok, push}` + +* Enhancements + * Channels reconnect on error with exponential backoff, with options for backoff timeouts + * Added `Channel.joined?/1` + +* Bug fixes + * When leaving the channel returns `:stop` on the `:leave` callback to stop the process, there were some channel processes leaks + ## v0.10.0 * Enhancements diff --git a/README.md b/README.md index c3da9e8..083de4c 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ You can control how frequently the socket will attempt to reconnect by setting Next, we will create a client channel and join the remote. ```elixir -{:ok, _response, channel} = PhoenixClient.Channel.join(socket, "rooms:lobby") +{:ok, channel} = PhoenixClient.Channel.join(socket, "rooms:lobby") ``` Now that we have successfully joined the channel, we are ready to push and receive diff --git a/lib/phoenix_client/channel.ex b/lib/phoenix_client/channel.ex index 58d33ad..79d4fef 100644 --- a/lib/phoenix_client/channel.ex +++ b/lib/phoenix_client/channel.ex @@ -4,18 +4,47 @@ defmodule PhoenixClient.Channel do alias PhoenixClient.{Socket, ChannelSupervisor, Message} @timeout 5_000 + @first_join_ms 0 + @backoff_buckets [1_000, 2_000, 5_000] + @max_backoff 10_000 - def child_spec({socket, topic, params}) do + @type t :: %__MODULE__{ + caller: pid | nil, + socket: pid, + topic: String.t() | atom, + params: map, + options: keyword, + pushes: [{pid, Message.t()}], + join_ref: String.t() | nil, + joined: boolean, + rejoin_ref: reference | nil, + tries: non_neg_integer + } + + defstruct [ + :caller, + :socket, + :topic, + :join_ref, + :rejoin_ref, + params: %{}, + options: [], + pushes: [], + joined: false, + tries: 0 + ] + + def child_spec({socket, topic, params, opts}) do %{ id: Module.concat(__MODULE__, topic), - start: {__MODULE__, :start_link, [socket, topic, params]}, + start: {__MODULE__, :start_link, [socket, topic, params, opts]}, restart: :temporary } end @doc false - def start_link(socket, topic, params) do - GenServer.start_link(__MODULE__, {socket, topic, params}) + def start_link(socket, topic, params, opts) do + GenServer.start_link(__MODULE__, {socket, topic, params, opts}) end @doc false @@ -33,27 +62,33 @@ defmodule PhoenixClient.Channel do multiple processes, you will need to start a new socket process for each channel. Calling join will link the caller to the channel process. + + Optional parameters is a keyword list with the following options: + - `first_join_ms: non_neg_integer` -> First timeout when sending the join message since the process starts. Default `0` + - `backoff_buckets: [non_neg_integer]` -> List of timeouts to be applied on the different backoff retries. Default [1_000, 2_000, 5_000] + - `max_backoff: non_neg_integer` -> Maximum timeout to be applied on the backoff algorithm, if the retry doesn't fit on any bucket, it will use this timeout. """ - @spec join(pid | atom, binary, map, non_neg_integer) :: - {:ok, map, pid} + @spec join(pid | atom, binary, map, keyword) :: + {:ok, pid} | {:error, :socket_not_connected} - | {:error, :timeout} | {:error, any} - def join(socket_pid_or_name, topic, params \\ %{}, timeout \\ @timeout) - def join(nil, _topic, _params, _timeout), do: {:error, :socket_not_started} + def join(socket_pid_or_name, topic, params \\ %{}, opts \\ []) + def join(nil, _topic, _params, _opts), do: {:error, :socket_not_started} - def join(socket_name, topic, params, timeout) when is_atom(socket_name) do - join(Process.whereis(socket_name), topic, params, timeout) + def join(socket_name, topic, params, opts) when is_atom(socket_name) do + join(Process.whereis(socket_name), topic, params, opts) end - def join(socket_pid, topic, params, timeout) do - if Process.alive?(socket_pid) and Socket.connected?(socket_pid) do - case ChannelSupervisor.start_channel(socket_pid, topic, params) do - {:ok, pid} -> do_join(pid, timeout) - error -> error - end + def join(socket_pid, topic, params, opts) do + with {:socket, true} <- {:socket, Process.alive?(socket_pid)}, + {:ok, pid} <- ChannelSupervisor.start_channel(socket_pid, topic, params, opts), + :ok <- set_parent(pid) do + Process.link(pid) + + {:ok, pid} else - {:error, :socket_not_connected} + {:socket, _} -> {:error, :socket_not_connected} + error -> error end end @@ -63,7 +98,6 @@ defmodule PhoenixClient.Channel do @spec leave(pid) :: :ok def leave(pid) do GenServer.call(pid, :leave) - GenServer.stop(pid) end @doc """ @@ -85,40 +119,58 @@ defmodule PhoenixClient.Channel do GenServer.cast(pid, {:push, event, payload}) end - # Callbacks - @impl true - def init({socket, topic, params}) do - {:ok, - %{ - caller: nil, - socket: socket, - topic: topic, - params: params, - pushes: [], - join_ref: nil - }} + @doc """ + Check if the channel is joined or not + """ + @spec joined?(pid) :: boolean + def joined?(pid) do + GenServer.call(pid, :joined?) end + # Callbacks @impl true - def handle_call( - :join, - {pid, _ref} = from, - %{socket: socket, topic: topic, params: params} = state - ) do - case Socket.channel_join(socket, self(), topic, params) do - {:ok, push} -> - {:noreply, - %{state | join_ref: push.ref, caller: pid, pushes: [{from, push} | state.pushes]}} + def init({socket, topic, params, opts}) do + case Socket.channel_join(socket, self(), topic) do + :ok -> + Process.link(socket) + Process.flag(:trap_exit, true) + + first_join = Keyword.get(opts, :first_join_ms, @first_join_ms) + Process.send_after(self(), :join, first_join) + + state = %__MODULE__{ + socket: socket, + topic: topic, + params: params, + options: opts + } + + {:ok, state} + + {:error, error} -> + {:stop, error} error -> - {:reply, error, state} + {:stop, error} end end + @impl true + def handle_call(:set_parent, {pid, _ref}, state) do + state = %{state | caller: pid} + {:reply, :ok, state} + end + + @impl true + def handle_call(:joined?, _from, %{joined: joined} = state) do + {:reply, joined, state} + end + @impl true def handle_call(:leave, _from, %{socket: socket, topic: topic} = state) do + send_leave(socket, topic) Socket.channel_leave(socket, self(), topic) - {:reply, :ok, state} + {:stop, :normal, :ok, state} end @impl true @@ -149,6 +201,25 @@ defmodule PhoenixClient.Channel do end @impl true + def handle_info(:join, %{joined: false} = state) do + state = do_join(state) + + {:noreply, state} + end + + @impl true + def handle_info(:join, state) do + {:noreply, state} + end + + @impl true + def handle_info(%Message{event: "phx_reply", ref: ref, join_ref: ref} = msg, state) do + send_message(msg, state) + + state = check_join_response(msg, state) + {:noreply, state} + end + def handle_info(%Message{event: "phx_reply", ref: ref} = msg, %{pushes: pushes} = s) do pushes = case Enum.split_with(pushes, &(elem(&1, 1).ref == ref)) do @@ -166,29 +237,99 @@ defmodule PhoenixClient.Channel do end @impl true - def handle_info(%Message{} = message, %{caller: pid, topic: topic} = state) do - send(pid, %{message | channel_pid: pid, topic: topic}) + def handle_info(%Message{event: "phx_close"} = message, state) do + send_message(message, state) + {:stop, :normal, state} + end + + @impl true + def handle_info(%Message{event: "phx_error"} = message, state) do + send_message(message, state) + + state = rejoin(%{state | joined: false}) {:noreply, state} end - defp do_join(pid, timeout) do - try do - case GenServer.call(pid, :join, timeout) do - {:ok, reply} -> - Process.link(pid) - {:ok, reply, pid} + @impl true + def handle_info(%Message{} = message, state) do + send_message(message, state) + {:noreply, state} + end - error -> - stop(pid) - error - end - catch - :exit, reason -> - stop(pid) - {:error, exit_reason(reason)} + @impl true + def handle_info({:EXIT, pid, reason}, state) do + cond do + pid == state.socket -> {:stop, reason, state} + pid == state.caller -> {:stop, reason, state} + true -> {:noreply, state} + end + end + + defp set_parent(channel_pid) do + GenServer.call(channel_pid, :set_parent) + end + + defp do_join(%{socket: socket, topic: topic, params: params} = state) do + state |> socket_send_join(socket, topic, params) |> rejoin() + end + + defp socket_send_join(state, socket, topic, params) do + case Socket.connected?(socket) do + true -> + push = send_join(socket, topic, params) + %{state | join_ref: push.ref} + + _ -> + state + end + end + + defp check_join_response(%{payload: %{"status" => "ok"}}, state) do + cancel_rejoin(state) + %{state | joined: true, rejoin_ref: nil, tries: 0} + end + + defp check_join_response(_msg, state) do + rejoin(state) + end + + defp send_join(socket, topic, params) do + message = Message.join(topic, params) + Socket.push(socket, message) + end + + defp send_leave(socket, topic) do + message = Message.leave(topic) + Socket.push(socket, message) + end + + defp rejoin(state) do + cancel_rejoin(state) + + backoff_ms = backoff(state) + rejoin_ref = Process.send_after(self(), :join, backoff_ms) + + %{state | rejoin_ref: rejoin_ref, tries: state.tries + 1} + end + + defp cancel_rejoin(%{rejoin_ref: ref}) when is_reference(ref) do + Process.cancel_timer(ref) + end + + defp cancel_rejoin(_), do: :ok + + defp backoff(%{tries: tries, options: opts}), do: backoff(tries, opts) + + defp backoff(tries, opts) do + buckets = Keyword.get(opts, :backoff_buckets, @backoff_buckets) + + case Enum.at(buckets, tries) do + nil -> Keyword.get(opts, :max_backoff, @max_backoff) + timeout -> timeout end end - defp exit_reason({:timeout, _}), do: :timeout - defp exit_reason(reason), do: reason + defp send_message(message, %{caller: pid, topic: topic}) do + send(pid, %{message | channel_pid: pid, topic: topic}) + end end diff --git a/lib/phoenix_client/channel_supervisor.ex b/lib/phoenix_client/channel_supervisor.ex index c2de3b3..65dd725 100644 --- a/lib/phoenix_client/channel_supervisor.ex +++ b/lib/phoenix_client/channel_supervisor.ex @@ -7,8 +7,8 @@ defmodule PhoenixClient.ChannelSupervisor do DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__) end - def start_channel(socket, topic, params) do - spec = Channel.child_spec({socket, topic, params}) + def start_channel(socket, topic, params, opts) do + spec = Channel.child_spec({socket, topic, params, opts}) DynamicSupervisor.start_child(__MODULE__, spec) end diff --git a/lib/phoenix_client/socket.ex b/lib/phoenix_client/socket.ex index 984a1a4..2fce204 100644 --- a/lib/phoenix_client/socket.ex +++ b/lib/phoenix_client/socket.ex @@ -39,8 +39,8 @@ defmodule PhoenixClient.Socket do end @doc false - def channel_join(pid, channel, topic, params) do - GenServer.call(pid, {:channel_join, channel, topic, params}) + def channel_join(pid, channel, topic) do + GenServer.call(pid, {:channel_join, channel, topic}) end @doc false @@ -121,17 +121,15 @@ defmodule PhoenixClient.Socket do @impl true def handle_call( - {:channel_join, channel_pid, topic, params}, + {:channel_join, channel_pid, topic}, _from, %{channels: channels} = state ) do case Map.get(channels, topic) do nil -> monitor_ref = Process.monitor(channel_pid) - message = Message.join(topic, params) - {push, state} = push_message(message, state) channels = Map.put(channels, topic, {channel_pid, monitor_ref}) - {:reply, {:ok, push}, %{state | channels: channels}} + {:reply, :ok, %{state | channels: channels}} {pid, _topic} -> {:reply, {:error, {:already_joined, pid}}, state} @@ -146,10 +144,8 @@ defmodule PhoenixClient.Socket do {_channel_pid, monitor_ref} -> Process.demonitor(monitor_ref) - message = Message.leave(topic) - {push, state} = push_message(message, state) channels = Map.drop(channels, [topic]) - {:reply, {:ok, push}, %{state | channels: channels}} + {:reply, :ok, %{state | channels: channels}} end end @@ -275,7 +271,7 @@ defmodule PhoenixClient.Socket do end defp close(reason, %{channels: channels, reconnect_timer: nil} = state) do - state = %{state | status: :disconnected, channels: %{}} + state = %{state | status: :disconnected} message = %Message{event: close_event(reason), payload: %{reason: reason}} diff --git a/test/phoenix_client_test.exs b/test/phoenix_client_test.exs index c291711..1bf811c 100644 --- a/test/phoenix_client_test.exs +++ b/test/phoenix_client_test.exs @@ -1,7 +1,6 @@ defmodule PhoenixClientTest do use ExUnit.Case, async: false use RouterHelper - import ExUnit.CaptureLog import Plug.Conn, except: [assign: 3, push: 3] alias __MODULE__.Endpoint @@ -69,6 +68,10 @@ defmodule PhoenixClientTest do raise "boom" end + def handle_in("stop", message, socket) do + {:stop, :normal, {:ok, message}, socket} + end + def handle_in(_, _message, socket) do {:noreply, socket} end @@ -172,7 +175,8 @@ defmodule PhoenixClientTest do def handle_continue(:connect_to_channel, state) do {:ok, socket} = Socket.start_link(PhoenixClientTest.socket_config()) PhoenixClientTest.wait_for_socket(socket) - {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + PhoenixClientTest.wait_for_channel(channel) {:noreply, %{state | channel: channel, socket: socket}} end @@ -208,13 +212,16 @@ defmodule PhoenixClientTest do test "socket can join a channel" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - assert {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) + assert Channel.joined?(channel) end test "socket cannot join more than one channel of the same topic" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - assert {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) assert {:error, {:already_joined, ^channel}} = Channel.join(socket, "rooms:admin-lobby") end @@ -222,7 +229,25 @@ defmodule PhoenixClientTest do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) message = %{"foo" => "bar"} - assert {:ok, ^message, _channel} = Channel.join(socket, "rooms:reply", message) + assert {:ok, channel} = Channel.join(socket, "rooms:reply", message) + wait_for_channel(channel) + + assert_receive %Message{ + ref: ref, + join_ref: ref, + payload: %{"response" => ^message, "status" => "ok"} + } + end + + test "socket can join a channel waiting for an specific amount of time" do + {:ok, socket} = Socket.start_link(@socket_config) + wait_for_socket(socket) + message = %{"foo" => "bar"} + assert {:ok, channel} = Channel.join(socket, "rooms:reply", message, first_join_ms: 400) + Process.sleep(200) + refute Channel.joined?(channel) + Process.sleep(200) + assert_receive %Message{payload: %{"response" => ^message, "status" => "ok"}} end test "return an error if socket is down" do @@ -233,14 +258,16 @@ defmodule PhoenixClientTest do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) user_id = "123" - assert {:ok, _, _} = Channel.join(socket, "rooms:admin-lobby", %{user: user_id}) + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby", %{user: user_id}) + wait_for_channel(channel) assert_receive %Message{event: "user:entered", payload: %{"user" => ^user_id}} end test "socket can leave a channel" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) assert :ok = Channel.leave(channel) refute Process.alive?(channel) end @@ -248,20 +275,16 @@ defmodule PhoenixClientTest do test "client can push to a channel" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) assert {:ok, %{"test" => "test"}} = Channel.push(channel, "new:msg", %{test: :test}) end - test "join timeouts" do - {:ok, socket} = Socket.start_link(@socket_config) - wait_for_socket(socket) - {:error, :timeout} = Channel.join(socket, "rooms:join_timeout", %{}, 1) - end - test "push timeouts" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) assert catch_exit(Channel.push(channel, "foo:bar", %{}, 500)) end @@ -269,9 +292,11 @@ defmodule PhoenixClientTest do test "push async" do {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + {:ok, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) - assert :ok = Channel.push_async(channel, "foo:bar", %{}) + assert :ok = Channel.push_async(channel, "new:msg", %{test: :test}) + assert_receive %Message{payload: %{"response" => %{"test" => "test"}}} end test "socket params can be sent" do @@ -301,18 +326,56 @@ defmodule PhoenixClientTest do endpoint = context[:endpoint] {:ok, socket} = Socket.start_link(@socket_config) wait_for_socket(socket) - assert {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + opts = [backoff_buckets: [], max_backoff: 100] + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby", %{}, opts) + wait_for_channel(channel) Process.exit(endpoint, :kill) - :timer.sleep(10) + :timer.sleep(100) assert_receive %Message{event: "phx_error"} + assert Process.alive?(socket) refute Socket.connected?(socket) + assert Process.alive?(channel) + refute Channel.joined?(channel) start_endpoint() wait_for_socket(socket) - :sys.get_state(socket) - assert {:ok, _, channel} = Channel.join(socket, "rooms:admin-lobby") + wait_for_channel(channel) + assert {:error, {:already_joined, ^channel}} = Channel.join(socket, "rooms:admin-lobby") + end + + test "closes channel when normal server stop" do + {:ok, socket} = Socket.start_link(@socket_config) + wait_for_socket(socket) + opts = [backoff_buckets: [], max_backoff: 100] + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby", %{}, opts) + wait_for_channel(channel) + + Channel.push_async(channel, "stop", %{test: :test}) + + assert_receive %Message{payload: %{"response" => %{"test" => "test"}, "status" => "ok"}} + assert_receive %Message{event: "phx_close"} + + refute Process.alive?(channel) + end + + test "rejoin channel when error" do + {:ok, socket} = Socket.start_link(@socket_config) + wait_for_socket(socket) + opts = [backoff_buckets: [], max_backoff: 100] + assert {:ok, channel} = Channel.join(socket, "rooms:admin-lobby", %{}, opts) + wait_for_channel(channel) + + Channel.push_async(channel, "boom", %{}) + + assert_receive %Message{event: "phx_error"} + + assert Process.alive?(channel) + + assert_receive %Message{event: "you:left"} + wait_for_channel(channel) + assert Channel.joined?(channel) end test "use async with genserver" do @@ -326,8 +389,10 @@ defmodule PhoenixClientTest do config = Keyword.put(@socket_config, :headers, [{"x-extra", "value"}]) {:ok, socket} = Socket.start_link(config) wait_for_socket(socket) - {:ok, headers, _channel} = Channel.join(socket, "rooms:headers") - assert %{"x-extra" => "value"} = headers + {:ok, channel} = Channel.join(socket, "rooms:headers") + wait_for_channel(channel) + assert_receive %Message{payload: %{"response" => %{"x-extra" => "value"}}} + # assert %{"x-extra" => "value"} = headers end defp assert_message(pid, message, counter \\ 0) @@ -350,20 +415,17 @@ defmodule PhoenixClientTest do end end + def wait_for_channel(channel) do + unless Channel.joined?(channel) do + wait_for_channel(channel) + end + end + def socket_config(), do: @socket_config defp start_endpoint() do - self = self() - - capture_log(fn -> - {:ok, pid} = Endpoint.start_link() - send(self, {:pid, pid}) - end) - - receive do - {:pid, pid} -> - Process.unlink(pid) - [endpoint: pid] - end + {:ok, pid} = Endpoint.start_link() + Process.unlink(pid) + [endpoint: pid] end end