A simple PubSub adapter that sends messages between nodes over AMQP.
The app supervisor needs to be configured with:
{Phoenix.PubSub, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: amqp_connection_id]},
defmodule Phoenix.PubSub.AMQP do | |
use Supervisor | |
require Logger | |
@behaviour Phoenix.PubSub.Adapter | |
def start_link(opts) do | |
Supervisor.start_link(__MODULE__, opts, name: __MODULE__) | |
end | |
@impl Supervisor | |
def init(opts) do | |
# [adapter_name: App.PubSub.Adapter, name: App.PubSub, adapter: Phoenix.PubSub.AMQP, amqp: [connection: ...]] | |
children = [ | |
{__MODULE__.Gateway, opts} | |
] | |
Supervisor.init(children, strategy: :one_for_one) | |
end | |
@impl Phoenix.PubSub.Adapter | |
defdelegate broadcast(adapter_name, topic, message, dispatcher), to: __MODULE__.Gateway | |
@impl Phoenix.PubSub.Adapter | |
defdelegate direct_broadcast(adapter_name, node_name, topic, message, dispatcher), | |
to: __MODULE__.Gateway | |
@impl true | |
def node_name(_adapter_name), do: node() | |
defmodule Gateway do | |
use GenServer | |
require Logger | |
defstruct [:connection, :ch, :name, :ident, :node_name] | |
def start_link(opts) do | |
GenServer.start_link(__MODULE__, opts, name: opts[:adapter_name]) | |
end | |
def broadcast(adapter_name, topic, message, dispatcher) do | |
GenServer.call(adapter_name, {:broadcast, topic, message, dispatcher}) | |
end | |
def direct_broadcast(adapter_name, node_name, topic, message, dispatcher) do | |
if node_name == node() do | |
Phoenix.PubSub.local_broadcast(adapter_name, topic, message, dispatcher) | |
else | |
GenServer.call(adapter_name, {:direct_broadcast, node_name, topic, message, dispatcher}) | |
end | |
end | |
def init(opts) do | |
state = %__MODULE__{ | |
connection: opts[:amqp][:connection], | |
name: opts[:name], | |
ident: Atom.to_string(opts[:name]), | |
node_name: Atom.to_string(node()) | |
} | |
{:ok, state, {:continue, :connect}} | |
end | |
def handle_continue(:connect, state) do | |
with {:ok, conn} <- AMQP.Application.get_connection(state.connection), | |
{:ok, ch} <- AMQP.Channel.open(conn) do | |
Process.monitor(ch.pid) | |
setup_channel_disposal(ch) | |
:ok = AMQP.Basic.qos(ch, prefetch_count: 25) | |
{:noreply, %__MODULE__{state | ch: ch}, {:continue, :setup_topology}} | |
else | |
_ -> | |
Process.send_after(self(), :try_connect, 100) | |
{:noreply, state} | |
end | |
end | |
def handle_continue(:setup_topology, state) do | |
:ok = AMQP.Exchange.fanout(state.ch, exchange(state), durable: true) | |
{:ok, %{queue: queue}} = AMQP.Queue.declare(state.ch, queue(state), durable: true) | |
:ok = AMQP.Queue.bind(state.ch, queue, exchange(state)) | |
{:ok, _tag} = AMQP.Basic.consume(state.ch, queue, nil) | |
{:noreply, state} | |
end | |
def handle_call( | |
{:broadcast, _topic, _message, _dispatcher}, | |
_from, | |
state = %__MODULE__{ch: nil} | |
) do | |
{:reply, {:error, :not_connected}, state} | |
end | |
def handle_call({:broadcast, topic, message, _dispatcher}, _from, state) do | |
:ok = | |
AMQP.Basic.publish(state.ch, exchange(state), "", :erlang.term_to_binary(message), | |
headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}] | |
) | |
{:reply, :ok, state} | |
end | |
def handle_call( | |
{:direct_broadcast, _node_name, _topic, _message, _dispatcher}, | |
_from, | |
state = %__MODULE__{ch: nil} | |
) do | |
{:reply, {:error, :not_connected}, state} | |
end | |
def handle_call({:direct_broadcast, node_name, topic, message, _dispatcher}, _from, state) do | |
:ok = | |
AMQP.Basic.publish(state.ch, "", queue(state, node_name), :erlang.term_to_binary(message), | |
headers: [{"topic", :longstr, topic}, {"sender", :longstr, state.node_name}] | |
) | |
{:reply, :ok, state} | |
end | |
def handle_info(:try_connect, state) do | |
{:noreply, state, {:continue, :connect}} | |
end | |
def handle_info({:basic_consume_ok, %{consumer_tag: _}}, state) do | |
{:noreply, state} | |
end | |
def handle_info( | |
{:basic_deliver, payload, %{delivery_tag: dtag, headers: headers}}, | |
state | |
) do | |
payload = :erlang.binary_to_term(payload) | |
topic = headers |> header("topic") | |
sender = headers |> header("sender") | |
if sender != Atom.to_string(node()) do | |
Phoenix.PubSub.local_broadcast(state.name, topic, payload) | |
end | |
AMQP.Basic.ack(state.ch, dtag) | |
{:noreply, state} | |
end | |
def handle_info( | |
{:DOWN, _ref, :process, pid, _reason}, | |
state = %__MODULE__{ch: %AMQP.Channel{pid: pid}} | |
) do | |
{:noreply, %__MODULE__{state | ch: nil}, {:continue, :connect}} | |
end | |
def handle_info(msg, state) do | |
Logger.error( | |
"#{__MODULE__} #{Process.info(self())[:registered_name]} received unexpected message in handle_info/2: #{inspect(msg)}" | |
) | |
{:noreply, state} | |
end | |
def terminate(reason, state) do | |
Logger.info("exiting: #{inspect(reason)}") | |
{:noreply, state} | |
end | |
defp exchange(%__MODULE__{ident: ident}), do: ident | |
defp queue(state), do: queue(state, node()) | |
defp queue(%__MODULE__{ident: ident}, n), do: "#{ident}.#{n}" | |
defp header(headers, name) do | |
case List.keyfind(headers, name, 0) do | |
{^name, _, value} -> value | |
nil -> nil | |
end | |
end | |
defp setup_channel_disposal(ch = %AMQP.Channel{pid: ch_pid}) do | |
gateway = self() | |
monitoring = make_ref() | |
spawn(fn -> | |
Process.monitor(gateway) | |
Process.monitor(ch_pid) | |
send(gateway, monitoring) | |
receive do | |
{:DOWN, _, _, ^gateway, _} -> | |
AMQP.Channel.close(ch) | |
nil | |
{:DOWN, _, _, ^ch_pid, _} -> | |
nil | |
end | |
end) | |
receive do | |
^monitoring -> :ok | |
end | |
end | |
end | |
end |