diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c8ada32..0c202ec 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,13 +22,11 @@ jobs: strategy: matrix: # For details see: https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp - elixir: ["1.17", "1.16", "1.15", "1.14"] + elixir: ["1.17", "1.16", "1.15"] otp: ["27", "26"] exclude: - { otp: "27", elixir: "1.16" } - { otp: "27", elixir: "1.15" } - - { otp: "27", elixir: "1.14" } - - { otp: "26", elixir: "1.14" } steps: - uses: actions/checkout@v4 - uses: erlef/setup-beam@v1 diff --git a/lib/lapin/connection.ex b/lib/lapin/connection.ex index 4c27ed4..51bf63d 100644 --- a/lib/lapin/connection.ex +++ b/lib/lapin/connection.ex @@ -12,8 +12,6 @@ defmodule Lapin.Connection do to publish messages on the connection configured for the implementing module. """ - use Connection - require Logger alias Lapin.{Consumer, Exchange, Message, Producer, Queue} @@ -38,16 +36,16 @@ defmodule Lapin.Connection do @type config :: [consumers: [Consumer.config()], producers: [Producer.config()]] @typedoc "Connection" - @type t :: GenServer.server() + @type t :: :gen_statem.server_ref() @typedoc "Callback result" @type on_callback :: :ok | {:error, message :: String.t()} @typedoc "Reason for message rejection" - @type reason :: term + @type reason :: term() @typedoc "`handle_deliver/2` callback result" - @type on_deliver :: :ok | {:reject, reason} | term + @type on_deliver :: :ok | {:reject, reason} | term() @doc """ Called when receiving a `basic.cancel` from the broker. @@ -138,25 +136,14 @@ defmodule Lapin.Connection do @spec start_link(config, options :: GenServer.options()) :: GenServer.on_start() def start_link(configuration, options \\ []) do {:ok, configuration} = cleanup_configuration(configuration) - Connection.start_link(__MODULE__, configuration, options) - end - - def init(configuration) do - { - :connect, - :init, - %{configuration: configuration, consumers: [], producers: [], connection: nil, module: nil} - } + :gen_statem.start_link({:local, options[:name]}, __MODULE__, configuration, []) end @doc """ Closes the connection """ @spec close(connection :: t) :: on_callback() - def close(connection), do: GenServer.stop(connection) - - def terminate(_reason, %{connection: nil}), do: :ok - def terminate(_reason, %{connection: connection}), do: AMQP.Connection.close(connection) + def close(connection), do: :gen_statem.stop(connection) @doc """ Publishes a message to the specified exchange with the given routing_key @@ -169,20 +156,51 @@ defmodule Lapin.Connection do options :: Keyword.t() ) :: on_callback def publish(connection, exchange, routing_key, payload, options \\ []), - do: Connection.call(connection, {:publish, exchange, routing_key, payload, options}) + do: :gen_statem.call(connection, {:publish, exchange, routing_key, payload, options}) + + @behaviour :gen_statem + + @impl :gen_statem + def callback_mode, do: [:handle_event_function, :state_enter] + + @impl :gen_statem + def init(configuration) do + { + :ok, + :disconnected, + %{ + configuration: configuration, + connection: nil, + consumers: [], + producers: [], + module: nil + }, + {:next_event, :internal, :connect} + } + end + + @impl :gen_statem + def handle_event(:enter, old_state, current_state, _data) do + Logger.info("#{inspect(old_state)} -> #{inspect(current_state)}") + :keep_state_and_data + end - def handle_call( + @impl :gen_statem + def handle_event( + {:call, from}, {:publish, _exchange, _routing_key, _payload, _options}, - _from, - %{connection: nil} = state + :disconnected, + %{connection: nil} ) do - {:reply, {:error, :not_connected}, state} + {:keep_state_and_data, {:reply, from, {:error, :not_connected}}} end - def handle_call( + @impl :gen_statem + def handle_event( + {:call, from}, {:publish, exchange, routing_key, payload, options}, - _from, - %{producers: producers, module: module} = state + :connected, + %{producers: producers, module: module} ) do with {:ok, %Producer{pattern: pattern} = producer} <- Producer.get(producers, exchange), mandatory = pattern.mandatory(producer), @@ -195,22 +213,25 @@ defmodule Lapin.Connection do if not pattern.confirm(producer) or Producer.confirm(producer) do Logger.debug(fn -> "Published #{inspect(message)} on #{inspect(producer)}" end) - {:reply, module.handle_publish(producer, message), state} + {:keep_state_and_data, {:reply, from, module.handle_publish(producer, message)}} else error = "Error publishing #{inspect(message)}" Logger.debug(fn -> error end) - {:reply, {:error, error}, state} + {:keep_state_and_data, {:reply, from, {:error, error}}} end else {:error, error} -> Logger.debug(fn -> "Error sending message: #{inspect(error)}" end) - {:reply, {:error, error}, state} + {:keep_state_and_data, {:reply, from, {:error, error}}} end end - def handle_info( + @impl :gen_statem + def handle_event( + :info, {:basic_cancel, %{consumer_tag: consumer_tag}}, - %{consumers: consumers, module: module} = state + :connected, + %{consumers: consumers, module: module} ) do case Consumer.get(consumers, consumer_tag) do {:ok, consumer} -> @@ -223,12 +244,15 @@ defmodule Lapin.Connection do ) end - {:stop, :normal, state} + {:stop, :normal} end - def handle_info( + @impl :gen_statem + def handle_event( + :info, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}, - %{consumers: consumers, module: module} = state + :connected, + %{consumers: consumers, module: module} ) do with {:ok, consumer} <- Consumer.get(consumers, consumer_tag), :ok <- module.handle_cancel_ok(consumer) do @@ -243,12 +267,15 @@ defmodule Lapin.Connection do Logger.error("Error handling broker cancel for '#{consumer_tag}': #{inspect(error)}") end - {:noreply, state} + :keep_state_and_data end - def handle_info( + @impl :gen_statem + def handle_event( + :info, {:basic_consume_ok, %{consumer_tag: consumer_tag}}, - %{consumers: consumers, module: module} = state + :connected, + %{consumers: consumers, module: module} ) do with {:ok, consumer} <- Consumer.get(consumers, consumer_tag), :ok <- module.handle_consume_ok(consumer) do @@ -263,17 +290,20 @@ defmodule Lapin.Connection do Logger.error("Error handling broker register for '#{consumer_tag}': #{inspect(error)}") end - {:noreply, state} + :keep_state_and_data end - def handle_info( + @impl :gen_statem + def handle_event( + :info, {:basic_return, payload, %{exchange: exchange} = meta}, - %{producers: producers, module: module} = state + :connected, + %{producers: producers, module: module} ) do message = %Message{meta: meta, payload: payload} with {:ok, producer} <- Producer.get(producers, exchange), - :ok <- module.handle_return(producer, message) do + :ok <- module.handle_return(producer, %Message{meta: meta, payload: payload}) do Logger.debug(fn -> "Broker returned message #{inspect(message)}" end) else {:error, :not_found} -> @@ -283,17 +313,21 @@ defmodule Lapin.Connection do Logger.debug(fn -> "Error handling returned message: #{inspect(error)}" end) end - {:noreply, state} + :keep_state_and_data end - def handle_info({:DOWN, _, :process, _pid, _reason}, state) do - Logger.warning("Connection down, restarting...") - {:stop, :normal, %{state | connection: nil}} + @impl :gen_statem + def handle_event(:info, {:DOWN, _, :process, _pid, _reason}, :connected, _data) do + Logger.warning("Connection down, reconnecting...") + {:stop, :normal} end - def handle_info( + @impl :gen_statem + def handle_event( + :info, {:basic_deliver, payload, %{consumer_tag: consumer_tag} = meta}, - %{consumers: consumers, module: module} = state + :connected, + %{consumers: consumers, module: module} ) do message = %Message{meta: meta, payload: payload} @@ -305,9 +339,59 @@ defmodule Lapin.Connection do Logger.error("Error processing message #{inspect(message)}, no local consumer") end - {:noreply, state} + :keep_state_and_data + end + + @impl :gen_statem + def handle_event(:internal, :disconnect, _state, data) do + { + :next_state, + :disconnected, + data, + {:state_timeout, @backoff, nil} + } + end + + @impl :gen_statem + def handle_event(:internal, :connect, :disconnected, %{configuration: configuration} = data) do + module = Keyword.get(configuration, :module) + + with configuration <- Keyword.merge(@connection_default_params, configuration), + {:ok, connection} <- AMQP.Connection.open(configuration), + _ref = Process.monitor(connection.pid), + {:ok, config_channel} <- AMQP.Channel.open(connection), + {:ok, exchanges} <- declare_exchanges(configuration, config_channel), + {:ok, queues} <- declare_queues(configuration, config_channel), + :ok <- bind_exchanges(exchanges, config_channel), + :ok <- bind_queues(queues, config_channel), + {:ok, producers} <- create_producers(configuration, connection), + {:ok, consumers} <- create_consumers(configuration, connection), + :ok <- AMQP.Channel.close(config_channel) do + { + :next_state, + :connected, + %{ + data + | module: module, + producers: producers, + consumers: consumers, + connection: connection + } + } + else + {:error, error} -> + Logger.error(fn -> + "Connection error: #{inspect(error)} for #{module}, backing off for #{@backoff}" + end) + + {:keep_state_and_data, {:state_timeout, @backoff, nil}} + end end + @impl :gen_statem + def handle_event(:state_timeout, _event, :disconnected, _data), + do: {:keep_state_and_data, {:next_event, :internal, :connect}} + defp consume( module, %Consumer{pattern: pattern} = consumer, @@ -378,38 +462,6 @@ defmodule Lapin.Connection do :ok end - def connect(_info, %{configuration: configuration} = state) do - module = Keyword.get(configuration, :module) - - with configuration <- Keyword.merge(@connection_default_params, configuration), - {:ok, connection} <- AMQP.Connection.open(configuration), - _ref = Process.monitor(connection.pid), - {:ok, config_channel} <- AMQP.Channel.open(connection), - {:ok, exchanges} <- declare_exchanges(configuration, config_channel), - {:ok, queues} <- declare_queues(configuration, config_channel), - :ok <- bind_exchanges(exchanges, config_channel), - :ok <- bind_queues(queues, config_channel), - {:ok, producers} <- create_producers(configuration, connection), - {:ok, consumers} <- create_consumers(configuration, connection), - :ok <- AMQP.Channel.close(config_channel) do - {:ok, - %{ - state - | module: module, - producers: producers, - consumers: consumers, - connection: connection - }} - else - {:error, error} -> - Logger.error(fn -> - "Connection error: #{inspect(error)} for #{module}, backing off for #{@backoff}" - end) - - {:backoff, @backoff, state} - end - end - defp declare_exchanges(configuration, channel) do exchanges = configuration diff --git a/mix.exs b/mix.exs index 3c884fd..d1496a6 100644 --- a/mix.exs +++ b/mix.exs @@ -4,8 +4,8 @@ defmodule Lapin.Mixfile do def project do [ app: :lapin, - version: "1.0.7", - elixir: "~> 1.12", + version: "2.0.0", + elixir: "~> 1.15", description: "Elixir RabbitMQ Client", source_url: "https://github.com/lucacorti/lapin", package: package(), @@ -30,7 +30,6 @@ defmodule Lapin.Mixfile do defp deps do [ {:amqp, "~> 4.0"}, - {:connection, "~> 1.0"}, {:ex_doc, ">= 0.0.0", only: [:dev, :test], runtime: false}, {:credo, ">= 0.0.0", only: [:dev], runtime: false}, {:dialyxir, ">= 0.0.0", only: [:dev], runtime: false} diff --git a/mix.lock b/mix.lock index e6f2480..e821a80 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,6 @@ "amqp": {:hex, :amqp, "4.0.0", "c62c0eba8ad5f5bbebf668ca4a68bbf8d9e35c9b3bc8703ff679c01f3e6899d3", [:mix], [{:amqp_client, "~> 4.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "4148c54dc35733e6c2f9208ff26bc61601cde2da993f752a3452442b018d5735"}, "amqp_client": {:hex, :amqp_client, "4.0.3", "c7dcc8031c780cd39ec586ba827a8eb26e006e9761af8d3f58fded11f645ebd4", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "4.0.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "ae945f7280617e9a4b17a6d49e3a2f496d716e8088ec29d8e94ecc79e5da7458"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, - "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"}, "credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"}, "dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"}, diff --git a/test/lapin_test.exs b/test/lapin_test.exs index 6d0587b..f89e2cf 100644 --- a/test/lapin_test.exs +++ b/test/lapin_test.exs @@ -6,6 +6,7 @@ defmodule LapinTest do use Lapin.Connection require Logger + @impl Lapin.Connection def handle_deliver(consumer, message) do Logger.debug(fn -> "Consuming message #{inspect(message, pretty: true)} received on #{inspect(consumer, pretty: true)}"