Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# PhoenixClient

## Unreleased

* Breaking changes
* Channel join is asynchronous, now it returns `{:ok, channel} | {:error, term}`
* Channel join doesn't have timeout anymore, now it receives an optional keyword list with options for controlling the backoff retries
* `Socket.channel_join/4` doesn't take the parameters anymore and now is `Socket.channel_join/3` now doesn't send the join message, now the Channel is responsible for that, so it doesn't need the params.
* `Socket.channel_join/3` and `Socket.channel_leave/3` returns `:ok` instead of `{:ok, push}`

* Enhancements
* Channels reconnect on error with exponential backoff, with options for backoff timeouts
* Added `Channel.joined?/1`

* Bug fixes
* When leaving the channel returns `:stop` on the `:leave` callback to stop the process, there were some channel processes leaks

## v0.10.0

* Enhancements
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ You can control how frequently the socket will attempt to reconnect by setting
Next, we will create a client channel and join the remote.

```elixir
{:ok, _response, channel} = PhoenixClient.Channel.join(socket, "rooms:lobby")
{:ok, channel} = PhoenixClient.Channel.join(socket, "rooms:lobby")
```

Now that we have successfully joined the channel, we are ready to push and receive
Expand Down
261 changes: 201 additions & 60 deletions lib/phoenix_client/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,47 @@ defmodule PhoenixClient.Channel do
alias PhoenixClient.{Socket, ChannelSupervisor, Message}

@timeout 5_000
@first_join_ms 0
@backoff_buckets [1_000, 2_000, 5_000]
@max_backoff 10_000

def child_spec({socket, topic, params}) do
@type t :: %__MODULE__{
caller: pid | nil,
socket: pid,
topic: String.t() | atom,
params: map,
options: keyword,
pushes: [{pid, Message.t()}],
join_ref: String.t() | nil,
joined: boolean,
rejoin_ref: reference | nil,
tries: non_neg_integer
}

defstruct [
:caller,
:socket,
:topic,
:join_ref,
:rejoin_ref,
params: %{},
options: [],
pushes: [],
joined: false,
tries: 0
]

def child_spec({socket, topic, params, opts}) do
%{
id: Module.concat(__MODULE__, topic),
start: {__MODULE__, :start_link, [socket, topic, params]},
start: {__MODULE__, :start_link, [socket, topic, params, opts]},
restart: :temporary
}
end

@doc false
def start_link(socket, topic, params) do
GenServer.start_link(__MODULE__, {socket, topic, params})
def start_link(socket, topic, params, opts) do
GenServer.start_link(__MODULE__, {socket, topic, params, opts})
end

@doc false
Expand All @@ -33,27 +62,33 @@ defmodule PhoenixClient.Channel do
multiple processes, you will need to start a new socket process for each channel.

Calling join will link the caller to the channel process.

Optional parameters is a keyword list with the following options:
- `first_join_ms: non_neg_integer` -> First timeout when sending the join message since the process starts. Default `0`
- `backoff_buckets: [non_neg_integer]` -> List of timeouts to be applied on the different backoff retries. Default [1_000, 2_000, 5_000]
- `max_backoff: non_neg_integer` -> Maximum timeout to be applied on the backoff algorithm, if the retry doesn't fit on any bucket, it will use this timeout.
"""
@spec join(pid | atom, binary, map, non_neg_integer) ::
{:ok, map, pid}
@spec join(pid | atom, binary, map, keyword) ::
{:ok, pid}
| {:error, :socket_not_connected}
| {:error, :timeout}
| {:error, any}
def join(socket_pid_or_name, topic, params \\ %{}, timeout \\ @timeout)
def join(nil, _topic, _params, _timeout), do: {:error, :socket_not_started}
def join(socket_pid_or_name, topic, params \\ %{}, opts \\ [])
def join(nil, _topic, _params, _opts), do: {:error, :socket_not_started}

def join(socket_name, topic, params, timeout) when is_atom(socket_name) do
join(Process.whereis(socket_name), topic, params, timeout)
def join(socket_name, topic, params, opts) when is_atom(socket_name) do
join(Process.whereis(socket_name), topic, params, opts)
end

def join(socket_pid, topic, params, timeout) do
if Process.alive?(socket_pid) and Socket.connected?(socket_pid) do
case ChannelSupervisor.start_channel(socket_pid, topic, params) do
{:ok, pid} -> do_join(pid, timeout)
error -> error
end
def join(socket_pid, topic, params, opts) do
with {:socket, true} <- {:socket, Process.alive?(socket_pid)},
{:ok, pid} <- ChannelSupervisor.start_channel(socket_pid, topic, params, opts),
:ok <- set_parent(pid) do
Process.link(pid)

{:ok, pid}
else
{:error, :socket_not_connected}
{:socket, _} -> {:error, :socket_not_connected}
error -> error
end
end

Expand All @@ -63,7 +98,6 @@ defmodule PhoenixClient.Channel do
@spec leave(pid) :: :ok
def leave(pid) do
GenServer.call(pid, :leave)
GenServer.stop(pid)
end

@doc """
Expand All @@ -85,40 +119,58 @@ defmodule PhoenixClient.Channel do
GenServer.cast(pid, {:push, event, payload})
end

# Callbacks
@impl true
def init({socket, topic, params}) do
{:ok,
%{
caller: nil,
socket: socket,
topic: topic,
params: params,
pushes: [],
join_ref: nil
}}
@doc """
Check if the channel is joined or not
"""
@spec joined?(pid) :: boolean
def joined?(pid) do
GenServer.call(pid, :joined?)
end

# Callbacks
@impl true
def handle_call(
:join,
{pid, _ref} = from,
%{socket: socket, topic: topic, params: params} = state
) do
case Socket.channel_join(socket, self(), topic, params) do
{:ok, push} ->
{:noreply,
%{state | join_ref: push.ref, caller: pid, pushes: [{from, push} | state.pushes]}}
def init({socket, topic, params, opts}) do
case Socket.channel_join(socket, self(), topic) do
:ok ->
Process.link(socket)
Process.flag(:trap_exit, true)

first_join = Keyword.get(opts, :first_join_ms, @first_join_ms)
Process.send_after(self(), :join, first_join)

state = %__MODULE__{
socket: socket,
topic: topic,
params: params,
options: opts
}

{:ok, state}

{:error, error} ->
{:stop, error}

error ->
{:reply, error, state}
{:stop, error}
end
end

@impl true
def handle_call(:set_parent, {pid, _ref}, state) do
state = %{state | caller: pid}
{:reply, :ok, state}
end

@impl true
def handle_call(:joined?, _from, %{joined: joined} = state) do
{:reply, joined, state}
end

@impl true
def handle_call(:leave, _from, %{socket: socket, topic: topic} = state) do
send_leave(socket, topic)
Socket.channel_leave(socket, self(), topic)
{:reply, :ok, state}
{:stop, :normal, :ok, state}
end

@impl true
Expand Down Expand Up @@ -149,6 +201,25 @@ defmodule PhoenixClient.Channel do
end

@impl true
def handle_info(:join, %{joined: false} = state) do
state = do_join(state)

{:noreply, state}
end

@impl true
def handle_info(:join, state) do
{:noreply, state}
end

@impl true
def handle_info(%Message{event: "phx_reply", ref: ref, join_ref: ref} = msg, state) do
send_message(msg, state)

state = check_join_response(msg, state)
{:noreply, state}
end

def handle_info(%Message{event: "phx_reply", ref: ref} = msg, %{pushes: pushes} = s) do
pushes =
case Enum.split_with(pushes, &(elem(&1, 1).ref == ref)) do
Expand All @@ -166,29 +237,99 @@ defmodule PhoenixClient.Channel do
end

@impl true
def handle_info(%Message{} = message, %{caller: pid, topic: topic} = state) do
send(pid, %{message | channel_pid: pid, topic: topic})
def handle_info(%Message{event: "phx_close"} = message, state) do
send_message(message, state)
{:stop, :normal, state}
end

@impl true
def handle_info(%Message{event: "phx_error"} = message, state) do
send_message(message, state)

state = rejoin(%{state | joined: false})
{:noreply, state}
end

defp do_join(pid, timeout) do
try do
case GenServer.call(pid, :join, timeout) do
{:ok, reply} ->
Process.link(pid)
{:ok, reply, pid}
@impl true
def handle_info(%Message{} = message, state) do
send_message(message, state)
{:noreply, state}
end

error ->
stop(pid)
error
end
catch
:exit, reason ->
stop(pid)
{:error, exit_reason(reason)}
@impl true
def handle_info({:EXIT, pid, reason}, state) do
cond do
pid == state.socket -> {:stop, reason, state}
pid == state.caller -> {:stop, reason, state}
true -> {:noreply, state}
end
end

defp set_parent(channel_pid) do
GenServer.call(channel_pid, :set_parent)
end

defp do_join(%{socket: socket, topic: topic, params: params} = state) do
state |> socket_send_join(socket, topic, params) |> rejoin()
end

defp socket_send_join(state, socket, topic, params) do
case Socket.connected?(socket) do
true ->
push = send_join(socket, topic, params)
%{state | join_ref: push.ref}

_ ->
state
end
end

defp check_join_response(%{payload: %{"status" => "ok"}}, state) do
cancel_rejoin(state)
%{state | joined: true, rejoin_ref: nil, tries: 0}
end

defp check_join_response(_msg, state) do
rejoin(state)
end

defp send_join(socket, topic, params) do
message = Message.join(topic, params)
Socket.push(socket, message)
end

defp send_leave(socket, topic) do
message = Message.leave(topic)
Socket.push(socket, message)
end

defp rejoin(state) do
cancel_rejoin(state)

backoff_ms = backoff(state)
rejoin_ref = Process.send_after(self(), :join, backoff_ms)

%{state | rejoin_ref: rejoin_ref, tries: state.tries + 1}
end

defp cancel_rejoin(%{rejoin_ref: ref}) when is_reference(ref) do
Process.cancel_timer(ref)
end

defp cancel_rejoin(_), do: :ok

defp backoff(%{tries: tries, options: opts}), do: backoff(tries, opts)

defp backoff(tries, opts) do
buckets = Keyword.get(opts, :backoff_buckets, @backoff_buckets)

case Enum.at(buckets, tries) do
nil -> Keyword.get(opts, :max_backoff, @max_backoff)
timeout -> timeout
end
end

defp exit_reason({:timeout, _}), do: :timeout
defp exit_reason(reason), do: reason
defp send_message(message, %{caller: pid, topic: topic}) do
send(pid, %{message | channel_pid: pid, topic: topic})
end
end
4 changes: 2 additions & 2 deletions lib/phoenix_client/channel_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ defmodule PhoenixClient.ChannelSupervisor do
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
end

def start_channel(socket, topic, params) do
spec = Channel.child_spec({socket, topic, params})
def start_channel(socket, topic, params, opts) do
spec = Channel.child_spec({socket, topic, params, opts})
DynamicSupervisor.start_child(__MODULE__, spec)
end

Expand Down
Loading