Skip to content

Instantly share code, notes, and snippets.

@narrowtux
Created September 20, 2017 12:02
Show Gist options
  • Save narrowtux/d1b01b50713ab525ddfa280f905e51de to your computer and use it in GitHub Desktop.
Save narrowtux/d1b01b50713ab525ddfa280f905e51de to your computer and use it in GitHub Desktop.
RabbitMQ strategy for libcluster
defmodule Cluster.RabbitStrategy do
use GenServer
use Cluster.Strategy
import Cluster.Logger
alias Cluster.Strategy.State
@routing_key "cluster.heartbeat"
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(opts) do
state = %State{
topology: Keyword.fetch!(opts, :topology),
connect: Keyword.fetch!(opts, :connect),
disconnect: Keyword.fetch!(opts, :disconnect),
list_nodes: Keyword.fetch!(opts, :list_nodes),
config: Keyword.fetch!(opts, :config)
}
connection_params = Keyword.fetch!(opts, :amqp)
{:ok, connection} = AMQP.Connection.open(connection_params)
{:ok, chan} = AMQP.Channel.open(connection)
Process.link(connection.pid)
Process.link(chan.pid)
exchange_name = Keyword.get(state.config, :exchange, "libcluster")
AMQP.Exchange.fanout(chan, exchange_name, durable: true)
{:ok, %{queue: queue_name}} = AMQP.Queue.declare(chan, "", exclusive: true)
AMQP.Queue.bind(chan, queue_name, exchange_name)
AMQP.Basic.consume(chan, queue_name, self())
state = %{state | meta: %{
queue: queue_name,
exchange: exchange_name,
channel: chan,
replace_hostname: Keyword.get(state.config, :replace_hostname, false)
}}
send(self(), :heartbeat)
{:ok, state}
end
def handle_info(:timeout, state), do: handle_info(:heartbeat, state)
def handle_info(:heartbeat, %{meta: %{exchange: exchange, channel: chan, replace_hostname: replace}} = state) do
payload = heartbeat(replace)
AMQP.Basic.publish(chan, exchange, @routing_key, payload)
{:noreply, state}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: dt}}, %{meta: %{channel: chan}} = state) do
handle_heartbeat(state, payload)
AMQP.Basic.ack(chan, dt)
{:noreply, state}
end
def handle_info({:basic_consume_ok, _}, state), do: {:noreply, state}
def handle_info({:basic_cancel, _} = msg, state) do
{:stop, msg, state}
end
def handle_info({:basic_cancel_ok, _}, state) do
{:noreply, state}
end
defp node_name(false), do: Node.self()
defp node_name(true) do
{:ok, hostname} = :inet.gethostname()
Node.self()
|> Atom.to_string()
|> String.replace(~r/@.+/, "@#{hostname}")
|> String.to_atom()
end
# Default value for replace_hostname is `false`
defp heartbeat(replace_hostname) do
msg = %{
node: node_name(replace_hostname),
cookie: Node.get_cookie()
}
<<"heartbeat::" :: binary, :erlang.term_to_binary(msg) :: binary >>
end
defp handle_heartbeat(%State{connect: connect, list_nodes: list_nodes} = state, <<"heartbeat::", rest::binary>>) do
case :erlang.binary_to_term(rest) do
%{node: n, cookie: cookie} when is_atom(n) ->
debug(state.topology, "received heartbeat from #{n}")
if (cookie != :nocookie), do: Node.set_cookie(n, cookie)
Cluster.Strategy.connect_nodes(state.topology, connect, list_nodes, [n])
:ok
other ->
warn(state.topology, "Got invalid message for cluster initialization: #{inspect other}")
:ok
end
end
defp handle_heartbeat(_state, _packet) do
:ok
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment