Skip to content

Commit

Permalink
Merge pull request #180 from pma/feature/rich-connection
Browse files Browse the repository at this point in the history
Named connections and channels
  • Loading branch information
ono authored Jan 3, 2021
2 parents 78e79fa + 2355bfd commit c8789fe
Show file tree
Hide file tree
Showing 5 changed files with 571 additions and 4 deletions.
194 changes: 190 additions & 4 deletions lib/amqp/application.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
defmodule AMQP.Application do
@moduledoc false
@moduledoc """
Provides access to configured connections and channels.
"""

use Application
require Logger

@impl true
def start(_type, _args) do
children = []

load_config()
children = load_connections() ++ load_channels()

opts = [
strategy: :one_for_one,
name: AMQP.Application,
max_restarts: length(children) * 2,
max_seconds: 1
]

opts = [strategy: :one_for_one, name: AMQP.Application]
Supervisor.start_link(children, opts)
end

Expand All @@ -20,6 +27,30 @@ defmodule AMQP.Application do
end
end

defp load_connections do
conn = Application.get_env(:amqp, :connection)
conns = Application.get_env(:amqp, :connections, [])
conns = if conn, do: conns ++ [default: conn], else: conns

Enum.map(conns, fn {name, opts} ->
arg = opts ++ [proc_name: name]
id = AMQP.Application.Connection.get_server_name(name)
Supervisor.child_spec({AMQP.Application.Connection, arg}, id: id)
end)
end

defp load_channels do
chan = Application.get_env(:amqp, :channel)
chans = Application.get_env(:amqp, :channels, [])
chans = if chan, do: chans ++ [default: chan], else: chans

Enum.map(chans, fn {name, opts} ->
arg = opts ++ [proc_name: name]
id = AMQP.Application.Channel.get_server_name(name)
Supervisor.child_spec({AMQP.Application.Channel, arg}, id: id)
end)
end

@doc """
Disables the progress report logging from Erlang library.
Expand Down Expand Up @@ -50,4 +81,159 @@ defmodule AMQP.Application do
error -> error
end
end

@doc """
Provides an easy way to access an AMQP connection.
The connection will be monitored by AMQP's GenServer and it will automatically try to reconnect when the connection is gone.
## Usage
When you want to have a single connection in your app:
config :amqp, connection: [
url: "amqp://guest:guest@localhost:15672"
]
You can also use any options available on `AMQP.Connection.open/2`:
config :amqp, connection: [
host: "localhost",
port: 15672
username: "guest",
password: "guest"
]
Then the connection will be open at the start of the application and you can access via this function.
iex> {:ok, conn} = AMQP.Application.get_connection()
By default, it tries to connect to your local RabbitMQ. You can simply pass the empty keyword list too:
config :amqp, connection: [] # == [url: "amqp://0.0.0.0"]
You can set up multiple connections wth `:connections` key:
config :amqp, connections: [
business_report: [
url: "amqp://host1"
],
analytics: [
url: "amqp://host2"
]
]
Then you can access each connection with its name.
iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics)
The defaut name is :default so These two configurations are equivalent:
config :amqp, connection: []
config :amqp, connections: [default: []]
## Configuration options
* `:retry_interval` - The retry interval in milliseconds when the connection is failed to open (default `5000`)
* `:url` - AMQP URI for the connection
See also `AMQP.Connection.open/2` for all available options.
"""
@spec get_connection(binary | atom) :: {:ok, AMQP.Connection.t()} | {:error, any}
def get_connection(name \\ :default) do
AMQP.Application.Connection.get_connection(name)
end

@doc """
Provides an easy way to access an AMQP channel.
AMQP.Application provides a wrapper on top of `AMQP.Channel` with .
The channel will be monitored by AMQP's GenServer and it will automatically try to reopen when the channel is gone.
## Usage
When you want to have a single channel in your app:
config :amqp,
connection: [url: "amqp://guest:guest@localhost:15672"],
channel: []
Then the channel will be open at the start of the application and you can access it via this function.
iex> {:ok, chan} = AMQP.Application.get_channel()
You can also set up multiple channels wth `:channels` key:
config :amqp,
connections: [
business_report: [url: "amqp://host1"],
analytics: [url: "amqp://host2"]
],
channels: [
bisiness_report: [connection: :business_report],
analytics: [connection: :analytics]
]
Then you can access each channel with its name.
iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics)
You can also have multiple channels for a single connection.
config :amqp,
connection: [],
channels: [
consumer: [],
producer: []
]
## Configuration options
* `:connection` - The connection name configured with `connection` or `connections` (default `:default`)
* `:retry_interval` - The retry interval in milliseconds when the channel is failed to open (default `5000`)
## Caveat
Although AMQP will reopen the named channel automatically when it is closed for some reasons,
your application still needs to monitor the channel for a consumer process.
Be aware the channel reponed doesn't automatically recover the subscription of your consumer
Here is a sample GenServer module that monitors the channel and re-subscribe the channel.
defmodule AppConsumer do
use GenServer
@channel :default
@queue "myqueue"
....
def handle_info(:subscribe, state) do
subscribe()
{noreply, state}
end
def handle_info({:DOWN, _, :process, pid, reason}, state) do
send(self(), :subscribe)
{:noreply, state}
end
defp subscribe() do
case AMQP.Application.get_channel(@channel) do
{:ok, chan} ->
Process.monitor(chan.pid)
AMQP.Basic.consume(@channel, @queue)
_error ->
Process.send_after(self(), :subscribe, 1000)
{:error, :retrying}
end
end
end
"""
@spec get_channel(binary | atom) :: {:ok, AMQP.Channel.t()} | {:error, any}
def get_channel(name \\ :default) do
AMQP.Application.Channel.get_channel(name)
end
end
160 changes: 160 additions & 0 deletions lib/amqp/application/channel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
defmodule AMQP.Application.Channel do
@moduledoc false
# This module will stay as a private module at least during 2.0.x.
# There might be non backward compatible changes on this module on 2.1.x.

use GenServer
require Logger
alias AMQP.Channel

@default_interval 5_000

@doc """
Starts a GenServer process linked to the current process.
## Examples
Combines name and retry interval with the connection options.
iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn]
iex> :ok = AMQP.Application.Channel.start_link(opts)
iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:my_chan)
If you omit the proc_name, it uses :default.
iex> :ok = AMQP.Application.Channel.start_link([])
iex> {:ok, chan} = AMQP.Application.Channel.get_channel()
iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:default)
"""
@spec start_link(keyword) :: GenServer.on_start()
def start_link(opts) do
{name, init_arg} = link_opts_to_init_arg(opts)

GenServer.start_link(__MODULE__, init_arg, name: name)
end

defp link_opts_to_init_arg(opts) do
proc_name = Keyword.get(opts, :proc_name, :default)
server_name = get_server_name(proc_name)
retry_interval = Keyword.get(opts, :retry_interval, @default_interval)
connection = Keyword.get(opts, :connection, proc_name)

init_arg = %{
retry_interval: retry_interval,
connection: connection,
name: proc_name,
monitor_ref: nil,
channel: nil
}

{server_name, init_arg}
end

@doc """
Returns a GenServer reference for the channel name
"""
@spec get_server_name(binary | atom) :: binary
def get_server_name(name) do
:"#{__MODULE__}::#{name}"
end

@doc false
def get_state(name \\ :default) do
GenServer.call(get_server_name(name), :get_state)
end

@doc """
Returns pid for the server referred by the name.
It is a wrapper of `GenServer.whereis/1`.
"""
@spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil
def whereis(name) do
name
|> get_server_name()
|> GenServer.whereis()
end

@doc """
Returns a channel referred by the name.
"""
@spec get_channel(binary | atom) :: {:ok, Channel.t()} | {:error, any}
def get_channel(name \\ :default) do
case GenServer.call(get_server_name(name), :get_channel) do
nil -> {:error, :channel_not_ready}
channel -> {:ok, channel}
end
end

@impl true
def init(state) do
send(self(), :open)
Process.flag(:trap_exit, true)
{:ok, state}
end

@impl true
def handle_call(:get_state, _, state) do
{:reply, state, state}
end

def handle_call(:get_channel, _, state) do
if state[:channel] && Process.alive?(state[:channel].pid) do
{:reply, state[:channel], state}
else
{:reply, nil, state}
end
end

@impl true
def handle_info(:open, state) do
case AMQP.Application.Connection.get_connection(state[:connection]) do
{:ok, conn} ->
case Channel.open(conn) do
{:ok, chan} ->
ref = Process.monitor(chan.pid)
{:noreply, %{state | channel: chan, monitor_ref: ref}}

{:error, error} ->
Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}")
Process.send_after(self(), :open, state[:retry_interval])
{:noreply, state}
end

_error ->
Logger.error(
"Failed to open an AMQP channel(#{state[:name]}). Connection (#{state[:connection]}) is not ready."
)

Process.send_after(self(), :open, state[:retry_interval])
{:noreply, state}
end
end

def handle_info({:DOWN, _, :process, pid, _reason}, %{channel: %{pid: pid}} = state)
when is_pid(pid) do
Logger.info("AMQP channel is gone (#{state[:name]}). Reopening...")
send(self(), :open)
{:noreply, %{state | channel: nil, monitor_ref: nil}}
end

def handle_info({:EXIT, _from, reason}, state) do
close(state)
{:stop, reason, %{state | channel: nil, monitor_ref: nil}}
end

@impl true
def terminate(_reason, state) do
close(state)
%{state | channel: nil, monitor_ref: nil}
end

defp close(%{channel: %Channel{} = channel, monior_ref: ref}) do
if Process.alive?(channel.pid) do
Process.demonitor(ref)
Channel.close(channel)
end
end

defp close(_), do: :ok
end
Loading

0 comments on commit c8789fe

Please sign in to comment.