From b82ffa2e2ed78c0362f3188e8a238f1a429ac828 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 29 Dec 2020 14:52:11 +0000 Subject: [PATCH 1/5] Create a connection from config --- lib/amqp/application.ex | 15 +++- lib/amqp/application/connection.ex | 122 +++++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 lib/amqp/application/connection.ex diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index f7a6898..b129677 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -6,9 +6,8 @@ defmodule AMQP.Application do @impl true def start(_type, _args) do - children = [] - load_config() + children = load_connections() |> IO.inspect() opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) @@ -20,6 +19,18 @@ defmodule AMQP.Application do end end + defp load_connections do + conn = Application.get_env(:amqp, :connection) + conns = Application.get_env(:amqp, :connections, []) + conns = if conn, do: conns ++ [default: conn], else: conns + + Enum.map(conns, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Connection.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Connection, arg}, id: id) + end) + end + @doc """ Disables the progress report logging from Erlang library. diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex new file mode 100644 index 0000000..7352cf1 --- /dev/null +++ b/lib/amqp/application/connection.ex @@ -0,0 +1,122 @@ +defmodule AMQP.Application.Connection do + @moduledoc """ + + """ + + use GenServer + require Logger + alias AMQP.Connection + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + It expects options to be a combination of connection args, proc_name and retry_interval. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_conn, retry_interval: 10_000, host: "localhost"] + iex> :ok = AMQP.Application.Connection.start_link(opts) + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:my_conn) + + Passes URL instead of options and use a default proc name when you need only a single connection. + + iex> opts = [url: "amqp://guest:guest@localhost"] + iex> {:ok, conn} = AMQP.Application.Connection.get_connection() + iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + open_arg = Keyword.drop(opts, [:proc_name, :retry_interval]) + + init_arg = %{ + retry_interval: retry_interval, + open_arg: open_arg, + name: proc_name, + connection: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the connection name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"amqp_connection_#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns a connection referred by the name. + """ + @spec get_connection(binary | atom) :: {:ok, Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + case GenServer.call(get_server_name(name), :get_connection) do + nil -> {:error, :not_connected} + conn -> {:ok, conn} + end + end + + @impl true + def init(state) do + send(self(), :connect) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_connection, _, state) do + {:reply, state[:connection], state} + end + + @impl true + def handle_info(:connect, state) do + case do_open(state[:open_arg]) do + {:ok, conn} -> + # Get notifications when the connection goes down + Process.monitor(conn.pid) + {:noreply, %{state | connection: conn}} + + {:error, _} -> + Logger.error("Failed to connect to AMQP server (#{state[:name]}). Retrying later...") + + # Retry later + Process.send_after(self(), :connect, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, _pid, reason}, _) do + # Stop GenServer. Will be restarted by Supervisor. + {:stop, {:connection_lost, reason}, nil} + end + + defp do_open(options) do + if url = options[:url] do + Connection.open(url, Keyword.delete(options, :url)) + else + Connection.open(options) + end + end +end From 1b31d81e0e3db21bfc165782977423d36dbe0e4b Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Tue, 29 Dec 2020 17:14:48 +0000 Subject: [PATCH 2/5] Channel management --- lib/amqp/application.ex | 22 ++++- lib/amqp/application/channel.ex | 130 +++++++++++++++++++++++++++++ lib/amqp/application/connection.ex | 23 +++-- 3 files changed, 167 insertions(+), 8 deletions(-) create mode 100644 lib/amqp/application/channel.ex diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index b129677..34c1d3f 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -7,9 +7,15 @@ defmodule AMQP.Application do @impl true def start(_type, _args) do load_config() - children = load_connections() |> IO.inspect() + children = load_connections() ++ load_channels() + + opts = [ + strategy: :one_for_one, + name: AMQP.Application, + max_restarts: length(children) * 2, + max_seconds: 1 + ] - opts = [strategy: :one_for_one, name: AMQP.Application] Supervisor.start_link(children, opts) end @@ -31,6 +37,18 @@ defmodule AMQP.Application do end) end + defp load_channels do + chan = Application.get_env(:amqp, :channel) + chans = Application.get_env(:amqp, :channels, []) + chans = if chan, do: chans ++ [default: chan], else: chans + + Enum.map(chans, fn {name, opts} -> + arg = opts ++ [proc_name: name] + id = AMQP.Application.Channel.get_server_name(name) + Supervisor.child_spec({AMQP.Application.Channel, arg}, id: id) + end) + end + @doc """ Disables the progress report logging from Erlang library. diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex new file mode 100644 index 0000000..fb3e924 --- /dev/null +++ b/lib/amqp/application/channel.ex @@ -0,0 +1,130 @@ +defmodule AMQP.Application.Channel do + @moduledoc false + + use GenServer + require Logger + alias AMQP.{Channel, Connection} + + @default_interval 5_000 + + @doc """ + Starts a GenServer process linked to the current process. + + ## Examples + + Combines name and retry interval with the connection options. + + iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn] + iex> :ok = AMQP.Application.Channel.start_link(opts) + iex> {:ok, chan} = AMQP.Application.Connection.get_connection(:my_chan) + + If you omit the proc_name, it uses :default. + + iex> :ok = AMQP.Application.Channel.start_link([]) + iex> {:ok, chan} = AMQP.Application.Connection.get_channel() + iex> {:ok, chan} = AMQP.Application.Connection.get_channel(:default) + """ + @spec start_link(keyword) :: GenServer.on_start() + def start_link(opts) do + {name, init_arg} = link_opts_to_init_arg(opts) + + GenServer.start_link(__MODULE__, init_arg, name: name) + end + + defp link_opts_to_init_arg(opts) do + proc_name = Keyword.get(opts, :proc_name, :default) + server_name = get_server_name(proc_name) + retry_interval = Keyword.get(opts, :retry_interval, @default_interval) + connection = Keyword.get(opts, :connection, proc_name) + + init_arg = %{ + retry_interval: retry_interval, + connection: connection, + name: proc_name, + channel: nil + } + + {server_name, init_arg} + end + + @doc """ + Returns a GenServer reference for the channel name + """ + @spec get_server_name(binary | atom) :: binary + def get_server_name(name) do + :"#{__MODULE__}::#{name}" + end + + @doc false + def get_state(name \\ :default) do + GenServer.call(get_server_name(name), :get_state) + end + + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + + @doc """ + Returns a channel referred by the name. + """ + @spec get_channel(binary | atom) :: {:ok, Connection.t()} | {:error, any} + def get_channel(name \\ :default) do + case GenServer.call(get_server_name(name), :get_channel) do + nil -> {:error, :channel_not_ready} + channel -> {:ok, channel} + end + end + + @impl true + def init(state) do + send(self(), :open) + {:ok, state} + end + + @impl true + def handle_call(:get_state, _, state) do + {:reply, state, state} + end + + def handle_call(:get_channel, _, state) do + {:reply, state[:channel], state} + end + + @impl true + def handle_info(:open, state) do + case AMQP.Application.Connection.get_connection(state[:connection]) do + {:ok, conn} -> + case Channel.open(conn) do + {:ok, chan} -> + Process.monitor(chan.pid) + {:noreply, %{state | channel: chan}} + + {:error, error} -> + Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}") + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + + _error -> + Logger.error( + "Failed to open an AMQP channel(#{state[:name]}). Connection (#{state[:connection]}) is not ready." + ) + + Process.send_after(self(), :open, state[:retry_interval]) + {:noreply, state} + end + end + + def handle_info({:DOWN, _, :process, _pid, reason}, state) do + # Stop GenServer. Will be restarted by Supervisor. + {:stop, {:channel_gone, reason}, nil} + end +end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index 7352cf1..5a04450 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -1,7 +1,5 @@ defmodule AMQP.Application.Connection do - @moduledoc """ - - """ + @moduledoc false use GenServer require Logger @@ -25,6 +23,7 @@ defmodule AMQP.Application.Connection do Passes URL instead of options and use a default proc name when you need only a single connection. iex> opts = [url: "amqp://guest:guest@localhost"] + iex> :ok = AMQP.Application.Connection.start_link(opts) iex> {:ok, conn} = AMQP.Application.Connection.get_connection() iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:default) """ @@ -56,7 +55,7 @@ defmodule AMQP.Application.Connection do """ @spec get_server_name(binary | atom) :: binary def get_server_name(name) do - :"amqp_connection_#{name}" + :"#{__MODULE__}::#{name}" end @doc false @@ -64,6 +63,18 @@ defmodule AMQP.Application.Connection do GenServer.call(get_server_name(name), :get_state) end + @doc """ + Returns pid for the server referred by the name. + + It is a wrapper of `GenServer.whereis/1`. + """ + @spec whereis(binary() | atom()) :: pid() | {atom(), node()} | nil + def whereis(name) do + name + |> get_server_name() + |> GenServer.whereis() + end + @doc """ Returns a connection referred by the name. """ @@ -107,9 +118,9 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, _pid, reason}, _) do + def handle_info({:DOWN, _, :process, _pid, reason}, state) do # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_lost, reason}, nil} + {:stop, {:connection_gone, reason}, nil} end defp do_open(options) do From 73bd733a733f16276ddafda9f3040044e4fb02cb Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 13:36:42 +0000 Subject: [PATCH 3/5] Test Application.Connection --- lib/amqp/application/channel.ex | 2 +- lib/amqp/application/connection.ex | 30 +++++++++++++++++++++++----- test/application/connection_test.exs | 24 ++++++++++++++++++++++ 3 files changed, 50 insertions(+), 6 deletions(-) create mode 100644 test/application/connection_test.exs diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index fb3e924..3fd19c0 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -123,7 +123,7 @@ defmodule AMQP.Application.Channel do end end - def handle_info({:DOWN, _, :process, _pid, reason}, state) do + def handle_info({:DOWN, _, :process, _pid, reason}, _state) do # Stop GenServer. Will be restarted by Supervisor. {:stop, {:channel_gone, reason}, nil} end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index 5a04450..c504710 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -17,7 +17,7 @@ defmodule AMQP.Application.Connection do Combines name and retry interval with the connection options. iex> opts = [proc_name: :my_conn, retry_interval: 10_000, host: "localhost"] - iex> :ok = AMQP.Application.Connection.start_link(opts) + iex> {:ok, pid} = AMQP.Application.Connection.start_link(opts) iex> {:ok, conn} = AMQP.Application.Connection.get_connection(:my_conn) Passes URL instead of options and use a default proc name when you need only a single connection. @@ -89,6 +89,7 @@ defmodule AMQP.Application.Connection do @impl true def init(state) do send(self(), :connect) + Process.flag(:trap_exit, true) {:ok, state} end @@ -110,7 +111,7 @@ defmodule AMQP.Application.Connection do {:noreply, %{state | connection: conn}} {:error, _} -> - Logger.error("Failed to connect to AMQP server (#{state[:name]}). Retrying later...") + Logger.error("Failed to open AMQP connection (#{state[:name]}). Retrying later...") # Retry later Process.send_after(self(), :connect, state[:retry_interval]) @@ -118,9 +119,28 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, _pid, reason}, state) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:connection_gone, reason}, nil} + def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) when is_pid(pid) do + Logger.info("AMQP connection is gone (#{state[:name]}). Reconnecting...") + send(self(), :connect) + {:noreply, %{state | connection: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state[:connection]) + {:stop, reason, %{state | connection: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state[:connection]) + %{state | connection: nil} + end + + defp close(nil), do: :ok + defp close(connection) do + if Process.alive?(connection.pid) do + Connection.close(connection) + end end defp do_open(options) do diff --git a/test/application/connection_test.exs b/test/application/connection_test.exs new file mode 100644 index 0000000..d6ada4b --- /dev/null +++ b/test/application/connection_test.exs @@ -0,0 +1,24 @@ +defmodule AMQP.Application.ConnectionTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + + test "opens and accesses to connections" do + opts = [proc_name: :my_conn, retry_interval: 10_000, url: "amqp://guest:guest@localhost"] + {:ok, pid} = AppConn.start_link(opts) + + {:ok, conn} = AppConn.get_connection(:my_conn) + assert %AMQP.Connection{} = conn + + Process.exit(pid, :normal) + end + + test "reconnects when the connection is gone" do + {:ok, _pid} = AppConn.start_link([]) + {:ok, %AMQP.Connection{} = conn1} = AppConn.get_connection() + AMQP.Connection.close(conn1) + :timer.sleep(50) + + assert {:ok, %AMQP.Connection{} = conn2} = AppConn.get_connection() + refute conn1 == conn2 + end +end From 6cf8f4b47fb5a5403bdfb5cffb64baaf069d52ca Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 14:11:08 +0000 Subject: [PATCH 4/5] Test channel --- lib/amqp/application/channel.ex | 34 ++++++++++++++++++++++++---- lib/amqp/application/connection.ex | 30 +++++++++++++----------- test/application/channel_test.exs | 34 ++++++++++++++++++++++++++++ test/application/connection_test.exs | 2 +- 4 files changed, 81 insertions(+), 19 deletions(-) create mode 100644 test/application/channel_test.exs diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index 3fd19c0..ee11ae0 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -41,6 +41,7 @@ defmodule AMQP.Application.Channel do retry_interval: retry_interval, connection: connection, name: proc_name, + monitor_ref: nil, channel: nil } @@ -86,6 +87,7 @@ defmodule AMQP.Application.Channel do @impl true def init(state) do send(self(), :open) + Process.flag(:trap_exit, true) {:ok, state} end @@ -104,8 +106,8 @@ defmodule AMQP.Application.Channel do {:ok, conn} -> case Channel.open(conn) do {:ok, chan} -> - Process.monitor(chan.pid) - {:noreply, %{state | channel: chan}} + ref = Process.monitor(chan.pid) + {:noreply, %{state | channel: chan, monitor_ref: ref}} {:error, error} -> Logger.error("Failed to open an AMQP channel(#{state[:name]}) - #{inspect(error)}") @@ -123,8 +125,30 @@ defmodule AMQP.Application.Channel do end end - def handle_info({:DOWN, _, :process, _pid, reason}, _state) do - # Stop GenServer. Will be restarted by Supervisor. - {:stop, {:channel_gone, reason}, nil} + def handle_info({:DOWN, _, :process, pid, _reason}, %{channel: %{pid: pid}} = state) + when is_pid(pid) do + Logger.info("AMQP channel is gone (#{state[:name]}). Reopening...") + send(self(), :open) + {:noreply, %{state | channel: nil, monitor_ref: nil}} + end + + def handle_info({:EXIT, _from, reason}, state) do + close(state) + {:stop, reason, %{state | channel: nil, monitor_ref: nil}} + end + + @impl true + def terminate(_reason, state) do + close(state) + %{state | channel: nil, monitor_ref: nil} end + + defp close(%{channel: %Channel{} = channel, monior_ref: ref}) do + if Process.alive?(channel.pid) do + Process.demonitor(ref) + Channel.close(channel) + end + end + + defp close(_), do: :ok end diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index c504710..ca0d3ca 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -44,7 +44,8 @@ defmodule AMQP.Application.Connection do retry_interval: retry_interval, open_arg: open_arg, name: proc_name, - connection: nil + connection: nil, + monitor_ref: nil } {server_name, init_arg} @@ -107,8 +108,8 @@ defmodule AMQP.Application.Connection do case do_open(state[:open_arg]) do {:ok, conn} -> # Get notifications when the connection goes down - Process.monitor(conn.pid) - {:noreply, %{state | connection: conn}} + ref = Process.monitor(conn.pid) + {:noreply, %{state | connection: conn, monitor_ref: ref}} {:error, _} -> Logger.error("Failed to open AMQP connection (#{state[:name]}). Retrying later...") @@ -119,30 +120,33 @@ defmodule AMQP.Application.Connection do end end - def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) when is_pid(pid) do + def handle_info({:DOWN, _, :process, pid, _reason}, %{connection: %{pid: pid}} = state) + when is_pid(pid) do Logger.info("AMQP connection is gone (#{state[:name]}). Reconnecting...") send(self(), :connect) - {:noreply, %{state | connection: nil}} + {:noreply, %{state | connection: nil, monitor_ref: nil}} end def handle_info({:EXIT, _from, reason}, state) do - close(state[:connection]) - {:stop, reason, %{state | connection: nil}} + close(state) + {:stop, reason, %{state | connection: nil, monitor_ref: nil}} end @impl true def terminate(_reason, state) do - close(state[:connection]) - %{state | connection: nil} + close(state) + %{state | connection: nil, monitor_ref: nil} end - defp close(nil), do: :ok - defp close(connection) do - if Process.alive?(connection.pid) do - Connection.close(connection) + defp close(%{connection: %Connection{} = conn, monior_ref: ref}) do + if Process.alive?(conn.pid) do + Process.demonitor(ref) + Connection.close(conn) end end + defp close(_), do: :ok + defp do_open(options) do if url = options[:url] do Connection.open(url, Keyword.delete(options, :url)) diff --git a/test/application/channel_test.exs b/test/application/channel_test.exs new file mode 100644 index 0000000..7559de5 --- /dev/null +++ b/test/application/channel_test.exs @@ -0,0 +1,34 @@ +defmodule AMQP.Application.ChnnelTest do + use ExUnit.Case + alias AMQP.Application.Connection, as: AppConn + alias AMQP.Application.Channel, as: AppChan + + setup do + {:ok, app_conn_pid} = AppConn.start_link([]) + + on_exit(fn -> + Process.exit(app_conn_pid, :normal) + end) + + [app_conn: app_conn_pid] + end + + test "opens and accesses channel" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, pid} = AppChan.start_link(opts) + + assert {:ok, %AMQP.Channel{}} = AppChan.get_channel(:test_chan) + Process.exit(pid, :normal) + end + + test "reconnects when the channel is gone" do + opts = [connection: :default, proc_name: :test_chan] + {:ok, _pid} = AppChan.start_link(opts) + {:ok, %AMQP.Channel{} = chan1} = AppChan.get_channel(:test_chan) + AMQP.Channel.close(chan1) + :timer.sleep(50) + + assert {:ok, %AMQP.Channel{} = chan2} = AppChan.get_channel(:test_chan) + refute chan1 == chan2 + end +end diff --git a/test/application/connection_test.exs b/test/application/connection_test.exs index d6ada4b..5efd0aa 100644 --- a/test/application/connection_test.exs +++ b/test/application/connection_test.exs @@ -2,7 +2,7 @@ defmodule AMQP.Application.ConnectionTest do use ExUnit.Case alias AMQP.Application.Connection, as: AppConn - test "opens and accesses to connections" do + test "opens and accesses connections" do opts = [proc_name: :my_conn, retry_interval: 10_000, url: "amqp://guest:guest@localhost"] {:ok, pid} = AppConn.start_link(opts) From 2355bfd21f9a60097f5885fcbd89243291721755 Mon Sep 17 00:00:00 2001 From: Tatsuya Ono Date: Sun, 3 Jan 2021 16:02:19 +0000 Subject: [PATCH 5/5] Access via AMQP.Application --- lib/amqp/application.ex | 159 ++++++++++++++++++++++++++++- lib/amqp/application/channel.ex | 18 ++-- lib/amqp/application/connection.ex | 8 +- 3 files changed, 177 insertions(+), 8 deletions(-) diff --git a/lib/amqp/application.ex b/lib/amqp/application.ex index 34c1d3f..0782627 100644 --- a/lib/amqp/application.ex +++ b/lib/amqp/application.ex @@ -1,5 +1,7 @@ defmodule AMQP.Application do - @moduledoc false + @moduledoc """ + Provides access to configured connections and channels. + """ use Application require Logger @@ -79,4 +81,159 @@ defmodule AMQP.Application do error -> error end end + + @doc """ + Provides an easy way to access an AMQP connection. + + The connection will be monitored by AMQP's GenServer and it will automatically try to reconnect when the connection is gone. + + ## Usage + + When you want to have a single connection in your app: + + config :amqp, connection: [ + url: "amqp://guest:guest@localhost:15672" + ] + + You can also use any options available on `AMQP.Connection.open/2`: + + config :amqp, connection: [ + host: "localhost", + port: 15672 + username: "guest", + password: "guest" + ] + + Then the connection will be open at the start of the application and you can access via this function. + + iex> {:ok, conn} = AMQP.Application.get_connection() + + By default, it tries to connect to your local RabbitMQ. You can simply pass the empty keyword list too: + + config :amqp, connection: [] # == [url: "amqp://0.0.0.0"] + + You can set up multiple connections wth `:connections` key: + + config :amqp, connections: [ + business_report: [ + url: "amqp://host1" + ], + analytics: [ + url: "amqp://host2" + ] + ] + + Then you can access each connection with its name. + + iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics) + + The defaut name is :default so These two configurations are equivalent: + + config :amqp, connection: [] + config :amqp, connections: [default: []] + + ## Configuration options + + * `:retry_interval` - The retry interval in milliseconds when the connection is failed to open (default `5000`) + * `:url` - AMQP URI for the connection + + See also `AMQP.Connection.open/2` for all available options. + """ + @spec get_connection(binary | atom) :: {:ok, AMQP.Connection.t()} | {:error, any} + def get_connection(name \\ :default) do + AMQP.Application.Connection.get_connection(name) + end + + @doc """ + Provides an easy way to access an AMQP channel. + + AMQP.Application provides a wrapper on top of `AMQP.Channel` with . + The channel will be monitored by AMQP's GenServer and it will automatically try to reopen when the channel is gone. + + ## Usage + + When you want to have a single channel in your app: + + config :amqp, + connection: [url: "amqp://guest:guest@localhost:15672"], + channel: [] + + Then the channel will be open at the start of the application and you can access it via this function. + + iex> {:ok, chan} = AMQP.Application.get_channel() + + You can also set up multiple channels wth `:channels` key: + + config :amqp, + connections: [ + business_report: [url: "amqp://host1"], + analytics: [url: "amqp://host2"] + ], + channels: [ + bisiness_report: [connection: :business_report], + analytics: [connection: :analytics] + ] + + Then you can access each channel with its name. + + iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report) + iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics) + + You can also have multiple channels for a single connection. + + config :amqp, + connection: [], + channels: [ + consumer: [], + producer: [] + ] + + ## Configuration options + + * `:connection` - The connection name configured with `connection` or `connections` (default `:default`) + * `:retry_interval` - The retry interval in milliseconds when the channel is failed to open (default `5000`) + + ## Caveat + + Although AMQP will reopen the named channel automatically when it is closed for some reasons, + your application still needs to monitor the channel for a consumer process. + Be aware the channel reponed doesn't automatically recover the subscription of your consumer + + Here is a sample GenServer module that monitors the channel and re-subscribe the channel. + + defmodule AppConsumer do + use GenServer + @channel :default + @queue "myqueue" + + .... + + def handle_info(:subscribe, state) do + subscribe() + {noreply, state} + end + + def handle_info({:DOWN, _, :process, pid, reason}, state) do + send(self(), :subscribe) + {:noreply, state} + end + + defp subscribe() do + case AMQP.Application.get_channel(@channel) do + {:ok, chan} -> + Process.monitor(chan.pid) + AMQP.Basic.consume(@channel, @queue) + + _error -> + Process.send_after(self(), :subscribe, 1000) + {:error, :retrying} + end + end + end + """ + @spec get_channel(binary | atom) :: {:ok, AMQP.Channel.t()} | {:error, any} + def get_channel(name \\ :default) do + AMQP.Application.Channel.get_channel(name) + end end diff --git a/lib/amqp/application/channel.ex b/lib/amqp/application/channel.ex index ee11ae0..ddd1939 100644 --- a/lib/amqp/application/channel.ex +++ b/lib/amqp/application/channel.ex @@ -1,9 +1,11 @@ defmodule AMQP.Application.Channel do @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. use GenServer require Logger - alias AMQP.{Channel, Connection} + alias AMQP.Channel @default_interval 5_000 @@ -16,13 +18,13 @@ defmodule AMQP.Application.Channel do iex> opts = [proc_name: :my_chan, retry_interval: 10_000, connection: :my_conn] iex> :ok = AMQP.Application.Channel.start_link(opts) - iex> {:ok, chan} = AMQP.Application.Connection.get_connection(:my_chan) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:my_chan) If you omit the proc_name, it uses :default. iex> :ok = AMQP.Application.Channel.start_link([]) - iex> {:ok, chan} = AMQP.Application.Connection.get_channel() - iex> {:ok, chan} = AMQP.Application.Connection.get_channel(:default) + iex> {:ok, chan} = AMQP.Application.Channel.get_channel() + iex> {:ok, chan} = AMQP.Application.Channel.get_channel(:default) """ @spec start_link(keyword) :: GenServer.on_start() def start_link(opts) do @@ -76,7 +78,7 @@ defmodule AMQP.Application.Channel do @doc """ Returns a channel referred by the name. """ - @spec get_channel(binary | atom) :: {:ok, Connection.t()} | {:error, any} + @spec get_channel(binary | atom) :: {:ok, Channel.t()} | {:error, any} def get_channel(name \\ :default) do case GenServer.call(get_server_name(name), :get_channel) do nil -> {:error, :channel_not_ready} @@ -97,7 +99,11 @@ defmodule AMQP.Application.Channel do end def handle_call(:get_channel, _, state) do - {:reply, state[:channel], state} + if state[:channel] && Process.alive?(state[:channel].pid) do + {:reply, state[:channel], state} + else + {:reply, nil, state} + end end @impl true diff --git a/lib/amqp/application/connection.ex b/lib/amqp/application/connection.ex index ca0d3ca..4557ae7 100644 --- a/lib/amqp/application/connection.ex +++ b/lib/amqp/application/connection.ex @@ -1,5 +1,7 @@ defmodule AMQP.Application.Connection do @moduledoc false + # This module will stay as a private module at least during 2.0.x. + # There might be non backward compatible changes on this module on 2.1.x. use GenServer require Logger @@ -100,7 +102,11 @@ defmodule AMQP.Application.Connection do end def handle_call(:get_connection, _, state) do - {:reply, state[:connection], state} + if state[:connection] && Process.alive?(state[:connection].pid) do + {:reply, state[:connection], state} + else + {:reply, nil, state} + end end @impl true