diff --git a/lib/tortoise311/connection/controller.ex b/lib/tortoise311/connection/controller.ex index c17a1e5..e2d3886 100644 --- a/lib/tortoise311/connection/controller.ex +++ b/lib/tortoise311/connection/controller.ex @@ -106,6 +106,7 @@ defmodule Tortoise311.Connection.Controller do @impl GenServer def init(%State{handler: handler} = opts) do {:ok, _} = Tortoise311.Events.register(opts.client_id, :status) + {:ok, _} = Tortoise311.Events.register(opts.client_id, :connection) case Handler.execute(handler, :init) do {:ok, %Handler{} = updated_handler} -> @@ -239,6 +240,16 @@ defmodule Tortoise311.Connection.Controller do end end + def handle_info( + {{Tortoise311, client_id}, :connection, {server, socket}}, + %State{client_id: client_id, handler: handler} = state + ) do + case Handler.execute(handler, {:connected, server, socket}) do + {:ok, updated_handler} -> + {:noreply, %State{state | handler: updated_handler}} + end + end + def handle_info({{Tortoise311, client_id}, ref, result}, %{client_id: client_id} = state) do case {result, Map.pop(state.awaiting, ref)} do {_, {nil, _}} -> diff --git a/lib/tortoise311/handler.ex b/lib/tortoise311/handler.ex index d8848f1..8574c2c 100644 --- a/lib/tortoise311/handler.ex +++ b/lib/tortoise311/handler.ex @@ -23,6 +23,9 @@ defmodule Tortoise311.Handler do `:down`, allowing for functionality to be run when the connection state changes. + - `connected/3` is called when a connection is made, making the socket + and the server type available. + - `subscription/3` is called when a topic filter subscription changes status, so this callback can be used to control the life-cycle of a subscription, allowing us to implement custom @@ -110,6 +113,9 @@ defmodule Tortoise311.Handler do initial_args: term(), state: term() } + @type socket :: any() + @type server :: :ssl | :tcp + @enforce_keys [:module, :initial_args] defstruct module: nil, state: nil, initial_args: [] @@ -143,6 +149,11 @@ defmodule Tortoise311.Handler do {:ok, state} end + @impl Tortoise311.Handler + def connected(_server, _socket, state) do + {:ok, state} + end + @impl Tortoise311.Handler def subscription(_status, _topic_filter, state) do {:ok, state} @@ -196,6 +207,23 @@ defmodule Tortoise311.Handler do """ @callback last_will(state) :: {{:ok, term() | nil}, state} when state: any() + @doc """ + Invoked when the connection is made. + + `server` is the transport module + + Returning `{:ok, new_state}` will set the state for later + invocations. + + Returning `{:ok, new_state, next_actions}`, where `next_actions` is + a list of next actions such as `{:unsubscribe, "foo/bar"}` will + result in the state being returned and the next actions performed. + """ + @callback connected(server, socket, state :: term()) :: + {:ok, new_state} + | {:ok, new_state, [next_action()]} + when new_state: term() + @doc """ Invoked when the connection status changes. @@ -342,6 +370,12 @@ defmodule Tortoise311.Handler do |> handle_result(handler) end + def execute(handler, {:connected, server, socket}) do + handler.module + |> apply(:connected, [server, socket, handler.state]) + |> handle_result(handler) + end + def execute(handler, {:publish, %Package.Publish{} = publish}) do topic_list = String.split(publish.topic, "/") diff --git a/lib/tortoise311/handler/logger.ex b/lib/tortoise311/handler/logger.ex index e1988ba..bc1b68a 100644 --- a/lib/tortoise311/handler/logger.ex +++ b/lib/tortoise311/handler/logger.ex @@ -29,6 +29,11 @@ defmodule Tortoise311.Handler.Logger do {:ok, state} end + def connected(server, socket, state) do + Logger.warning("Connected via #{inspect(server)} socket #{inspect(socket)}") + {:ok, state} + end + def subscription(:up, topic, state) do Logger.info("Subscribed to #{topic}") {:ok, state}