Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/tortoise311/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ defmodule Tortoise311.Connection.Controller do
@impl GenServer
def init(%State{handler: handler} = opts) do
{:ok, _} = Tortoise311.Events.register(opts.client_id, :status)
{:ok, _} = Tortoise311.Events.register(opts.client_id, :connection)

case Handler.execute(handler, :init) do
{:ok, %Handler{} = updated_handler} ->
Expand Down Expand Up @@ -239,6 +240,16 @@ defmodule Tortoise311.Connection.Controller do
end
end

def handle_info(
{{Tortoise311, client_id}, :connection, {server, socket}},
%State{client_id: client_id, handler: handler} = state
) do
case Handler.execute(handler, {:connected, server, socket}) do
{:ok, updated_handler} ->
{:noreply, %State{state | handler: updated_handler}}
end
end

def handle_info({{Tortoise311, client_id}, ref, result}, %{client_id: client_id} = state) do
case {result, Map.pop(state.awaiting, ref)} do
{_, {nil, _}} ->
Expand Down
34 changes: 34 additions & 0 deletions lib/tortoise311/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ defmodule Tortoise311.Handler do
`:down`, allowing for functionality to be run when the
connection state changes.

- `connected/3` is called when a connection is made, making the socket
and the server type available.

- `subscription/3` is called when a topic filter subscription
changes status, so this callback can be used to control the
life-cycle of a subscription, allowing us to implement custom
Expand Down Expand Up @@ -110,6 +113,9 @@ defmodule Tortoise311.Handler do
initial_args: term(),
state: term()
}
@type socket :: any()
@type server :: :ssl | :tcp

@enforce_keys [:module, :initial_args]
defstruct module: nil, state: nil, initial_args: []

Expand Down Expand Up @@ -143,6 +149,11 @@ defmodule Tortoise311.Handler do
{:ok, state}
end

@impl Tortoise311.Handler
def connected(_server, _socket, state) do
{:ok, state}
end

@impl Tortoise311.Handler
def subscription(_status, _topic_filter, state) do
{:ok, state}
Expand Down Expand Up @@ -196,6 +207,23 @@ defmodule Tortoise311.Handler do
"""
@callback last_will(state) :: {{:ok, term() | nil}, state} when state: any()

@doc """
Invoked when the connection is made.

`server` is the transport module

Returning `{:ok, new_state}` will set the state for later
invocations.

Returning `{:ok, new_state, next_actions}`, where `next_actions` is
a list of next actions such as `{:unsubscribe, "foo/bar"}` will
result in the state being returned and the next actions performed.
"""
@callback connected(server, socket, state :: term()) ::
{:ok, new_state}
| {:ok, new_state, [next_action()]}
when new_state: term()

@doc """
Invoked when the connection status changes.

Expand Down Expand Up @@ -342,6 +370,12 @@ defmodule Tortoise311.Handler do
|> handle_result(handler)
end

def execute(handler, {:connected, server, socket}) do
handler.module
|> apply(:connected, [server, socket, handler.state])
|> handle_result(handler)
end

def execute(handler, {:publish, %Package.Publish{} = publish}) do
topic_list = String.split(publish.topic, "/")

Expand Down
5 changes: 5 additions & 0 deletions lib/tortoise311/handler/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ defmodule Tortoise311.Handler.Logger do
{:ok, state}
end

def connected(server, socket, state) do
Logger.warning("Connected via #{inspect(server)} socket #{inspect(socket)}")
{:ok, state}
end

def subscription(:up, topic, state) do
Logger.info("Subscribed to #{topic}")
{:ok, state}
Expand Down