Skip to content

Commit

Permalink
Merge pull request #188 from lucacorti/maint/2.0
Browse files Browse the repository at this point in the history
2.0.0
  • Loading branch information
lucacorti authored Nov 9, 2024
2 parents 65e0964 + 0fa9c95 commit 66ec450
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 87 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ jobs:
strategy:
matrix:
# For details see: https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp
elixir: ["1.17", "1.16", "1.15", "1.14"]
elixir: ["1.17", "1.16", "1.15"]
otp: ["27", "26"]
exclude:
- { otp: "27", elixir: "1.16" }
- { otp: "27", elixir: "1.15" }
- { otp: "27", elixir: "1.14" }
- { otp: "26", elixir: "1.14" }
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand Down
212 changes: 132 additions & 80 deletions lib/lapin/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ defmodule Lapin.Connection do
to publish messages on the connection configured for the implementing module.
"""

use Connection

require Logger

alias Lapin.{Consumer, Exchange, Message, Producer, Queue}
Expand All @@ -38,16 +36,16 @@ defmodule Lapin.Connection do
@type config :: [consumers: [Consumer.config()], producers: [Producer.config()]]

@typedoc "Connection"
@type t :: GenServer.server()
@type t :: :gen_statem.server_ref()

@typedoc "Callback result"
@type on_callback :: :ok | {:error, message :: String.t()}

@typedoc "Reason for message rejection"
@type reason :: term
@type reason :: term()

@typedoc "`handle_deliver/2` callback result"
@type on_deliver :: :ok | {:reject, reason} | term
@type on_deliver :: :ok | {:reject, reason} | term()

@doc """
Called when receiving a `basic.cancel` from the broker.
Expand Down Expand Up @@ -138,25 +136,14 @@ defmodule Lapin.Connection do
@spec start_link(config, options :: GenServer.options()) :: GenServer.on_start()
def start_link(configuration, options \\ []) do
{:ok, configuration} = cleanup_configuration(configuration)
Connection.start_link(__MODULE__, configuration, options)
end

def init(configuration) do
{
:connect,
:init,
%{configuration: configuration, consumers: [], producers: [], connection: nil, module: nil}
}
:gen_statem.start_link({:local, options[:name]}, __MODULE__, configuration, [])
end

@doc """
Closes the connection
"""
@spec close(connection :: t) :: on_callback()
def close(connection), do: GenServer.stop(connection)

def terminate(_reason, %{connection: nil}), do: :ok
def terminate(_reason, %{connection: connection}), do: AMQP.Connection.close(connection)
def close(connection), do: :gen_statem.stop(connection)

@doc """
Publishes a message to the specified exchange with the given routing_key
Expand All @@ -169,20 +156,51 @@ defmodule Lapin.Connection do
options :: Keyword.t()
) :: on_callback
def publish(connection, exchange, routing_key, payload, options \\ []),
do: Connection.call(connection, {:publish, exchange, routing_key, payload, options})
do: :gen_statem.call(connection, {:publish, exchange, routing_key, payload, options})

@behaviour :gen_statem

@impl :gen_statem
def callback_mode, do: [:handle_event_function, :state_enter]

@impl :gen_statem
def init(configuration) do
{
:ok,
:disconnected,
%{
configuration: configuration,
connection: nil,
consumers: [],
producers: [],
module: nil
},
{:next_event, :internal, :connect}
}
end

@impl :gen_statem
def handle_event(:enter, old_state, current_state, _data) do
Logger.info("#{inspect(old_state)} -> #{inspect(current_state)}")
:keep_state_and_data
end

def handle_call(
@impl :gen_statem
def handle_event(
{:call, from},
{:publish, _exchange, _routing_key, _payload, _options},
_from,
%{connection: nil} = state
:disconnected,
%{connection: nil}
) do
{:reply, {:error, :not_connected}, state}
{:keep_state_and_data, {:reply, from, {:error, :not_connected}}}
end

def handle_call(
@impl :gen_statem
def handle_event(
{:call, from},
{:publish, exchange, routing_key, payload, options},
_from,
%{producers: producers, module: module} = state
:connected,
%{producers: producers, module: module}
) do
with {:ok, %Producer{pattern: pattern} = producer} <- Producer.get(producers, exchange),
mandatory = pattern.mandatory(producer),
Expand All @@ -195,22 +213,25 @@ defmodule Lapin.Connection do

if not pattern.confirm(producer) or Producer.confirm(producer) do
Logger.debug(fn -> "Published #{inspect(message)} on #{inspect(producer)}" end)
{:reply, module.handle_publish(producer, message), state}
{:keep_state_and_data, {:reply, from, module.handle_publish(producer, message)}}
else
error = "Error publishing #{inspect(message)}"
Logger.debug(fn -> error end)
{:reply, {:error, error}, state}
{:keep_state_and_data, {:reply, from, {:error, error}}}
end
else
{:error, error} ->
Logger.debug(fn -> "Error sending message: #{inspect(error)}" end)
{:reply, {:error, error}, state}
{:keep_state_and_data, {:reply, from, {:error, error}}}
end
end

def handle_info(
@impl :gen_statem
def handle_event(
:info,
{:basic_cancel, %{consumer_tag: consumer_tag}},
%{consumers: consumers, module: module} = state
:connected,
%{consumers: consumers, module: module}
) do
case Consumer.get(consumers, consumer_tag) do
{:ok, consumer} ->
Expand All @@ -223,12 +244,15 @@ defmodule Lapin.Connection do
)
end

{:stop, :normal, state}
{:stop, :normal}
end

def handle_info(
@impl :gen_statem
def handle_event(
:info,
{:basic_cancel_ok, %{consumer_tag: consumer_tag}},
%{consumers: consumers, module: module} = state
:connected,
%{consumers: consumers, module: module}
) do
with {:ok, consumer} <- Consumer.get(consumers, consumer_tag),
:ok <- module.handle_cancel_ok(consumer) do
Expand All @@ -243,12 +267,15 @@ defmodule Lapin.Connection do
Logger.error("Error handling broker cancel for '#{consumer_tag}': #{inspect(error)}")
end

{:noreply, state}
:keep_state_and_data
end

def handle_info(
@impl :gen_statem
def handle_event(
:info,
{:basic_consume_ok, %{consumer_tag: consumer_tag}},
%{consumers: consumers, module: module} = state
:connected,
%{consumers: consumers, module: module}
) do
with {:ok, consumer} <- Consumer.get(consumers, consumer_tag),
:ok <- module.handle_consume_ok(consumer) do
Expand All @@ -263,17 +290,20 @@ defmodule Lapin.Connection do
Logger.error("Error handling broker register for '#{consumer_tag}': #{inspect(error)}")
end

{:noreply, state}
:keep_state_and_data
end

def handle_info(
@impl :gen_statem
def handle_event(
:info,
{:basic_return, payload, %{exchange: exchange} = meta},
%{producers: producers, module: module} = state
:connected,
%{producers: producers, module: module}
) do
message = %Message{meta: meta, payload: payload}

with {:ok, producer} <- Producer.get(producers, exchange),
:ok <- module.handle_return(producer, message) do
:ok <- module.handle_return(producer, %Message{meta: meta, payload: payload}) do
Logger.debug(fn -> "Broker returned message #{inspect(message)}" end)
else
{:error, :not_found} ->
Expand All @@ -283,17 +313,21 @@ defmodule Lapin.Connection do
Logger.debug(fn -> "Error handling returned message: #{inspect(error)}" end)
end

{:noreply, state}
:keep_state_and_data
end

def handle_info({:DOWN, _, :process, _pid, _reason}, state) do
Logger.warning("Connection down, restarting...")
{:stop, :normal, %{state | connection: nil}}
@impl :gen_statem
def handle_event(:info, {:DOWN, _, :process, _pid, _reason}, :connected, _data) do
Logger.warning("Connection down, reconnecting...")
{:stop, :normal}
end

def handle_info(
@impl :gen_statem
def handle_event(
:info,
{:basic_deliver, payload, %{consumer_tag: consumer_tag} = meta},
%{consumers: consumers, module: module} = state
:connected,
%{consumers: consumers, module: module}
) do
message = %Message{meta: meta, payload: payload}

Expand All @@ -305,9 +339,59 @@ defmodule Lapin.Connection do
Logger.error("Error processing message #{inspect(message)}, no local consumer")
end

{:noreply, state}
:keep_state_and_data
end

@impl :gen_statem
def handle_event(:internal, :disconnect, _state, data) do
{
:next_state,
:disconnected,
data,
{:state_timeout, @backoff, nil}
}
end

@impl :gen_statem
def handle_event(:internal, :connect, :disconnected, %{configuration: configuration} = data) do
module = Keyword.get(configuration, :module)

with configuration <- Keyword.merge(@connection_default_params, configuration),
{:ok, connection} <- AMQP.Connection.open(configuration),
_ref = Process.monitor(connection.pid),
{:ok, config_channel} <- AMQP.Channel.open(connection),
{:ok, exchanges} <- declare_exchanges(configuration, config_channel),
{:ok, queues} <- declare_queues(configuration, config_channel),
:ok <- bind_exchanges(exchanges, config_channel),
:ok <- bind_queues(queues, config_channel),
{:ok, producers} <- create_producers(configuration, connection),
{:ok, consumers} <- create_consumers(configuration, connection),
:ok <- AMQP.Channel.close(config_channel) do
{
:next_state,
:connected,
%{
data
| module: module,
producers: producers,
consumers: consumers,
connection: connection
}
}
else
{:error, error} ->
Logger.error(fn ->
"Connection error: #{inspect(error)} for #{module}, backing off for #{@backoff}"
end)

{:keep_state_and_data, {:state_timeout, @backoff, nil}}
end
end

@impl :gen_statem
def handle_event(:state_timeout, _event, :disconnected, _data),
do: {:keep_state_and_data, {:next_event, :internal, :connect}}

defp consume(
module,
%Consumer{pattern: pattern} = consumer,
Expand Down Expand Up @@ -378,38 +462,6 @@ defmodule Lapin.Connection do
:ok
end

def connect(_info, %{configuration: configuration} = state) do
module = Keyword.get(configuration, :module)

with configuration <- Keyword.merge(@connection_default_params, configuration),
{:ok, connection} <- AMQP.Connection.open(configuration),
_ref = Process.monitor(connection.pid),
{:ok, config_channel} <- AMQP.Channel.open(connection),
{:ok, exchanges} <- declare_exchanges(configuration, config_channel),
{:ok, queues} <- declare_queues(configuration, config_channel),
:ok <- bind_exchanges(exchanges, config_channel),
:ok <- bind_queues(queues, config_channel),
{:ok, producers} <- create_producers(configuration, connection),
{:ok, consumers} <- create_consumers(configuration, connection),
:ok <- AMQP.Channel.close(config_channel) do
{:ok,
%{
state
| module: module,
producers: producers,
consumers: consumers,
connection: connection
}}
else
{:error, error} ->
Logger.error(fn ->
"Connection error: #{inspect(error)} for #{module}, backing off for #{@backoff}"
end)

{:backoff, @backoff, state}
end
end

defp declare_exchanges(configuration, channel) do
exchanges =
configuration
Expand Down
5 changes: 2 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ defmodule Lapin.Mixfile do
def project do
[
app: :lapin,
version: "1.0.7",
elixir: "~> 1.12",
version: "2.0.0",
elixir: "~> 1.15",
description: "Elixir RabbitMQ Client",
source_url: "https://github.com/lucacorti/lapin",
package: package(),
Expand All @@ -30,7 +30,6 @@ defmodule Lapin.Mixfile do
defp deps do
[
{:amqp, "~> 4.0"},
{:connection, "~> 1.0"},
{:ex_doc, ">= 0.0.0", only: [:dev, :test], runtime: false},
{:credo, ">= 0.0.0", only: [:dev], runtime: false},
{:dialyxir, ">= 0.0.0", only: [:dev], runtime: false}
Expand Down
1 change: 0 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"amqp": {:hex, :amqp, "4.0.0", "c62c0eba8ad5f5bbebf668ca4a68bbf8d9e35c9b3bc8703ff679c01f3e6899d3", [:mix], [{:amqp_client, "~> 4.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "4148c54dc35733e6c2f9208ff26bc61601cde2da993f752a3452442b018d5735"},
"amqp_client": {:hex, :amqp_client, "4.0.3", "c7dcc8031c780cd39ec586ba827a8eb26e006e9761af8d3f58fded11f645ebd4", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "4.0.3", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "ae945f7280617e9a4b17a6d49e3a2f496d716e8088ec29d8e94ecc79e5da7458"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"},
"dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"},
Expand Down
1 change: 1 addition & 0 deletions test/lapin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule LapinTest do
use Lapin.Connection
require Logger

@impl Lapin.Connection
def handle_deliver(consumer, message) do
Logger.debug(fn ->
"Consuming message #{inspect(message, pretty: true)} received on #{inspect(consumer, pretty: true)}"
Expand Down

0 comments on commit 66ec450

Please sign in to comment.