Skip to content

Commit

Permalink
Merge pull request #187 from lucacorti/cleanup/amqp-4.0
Browse files Browse the repository at this point in the history
Cleanup
  • Loading branch information
lucacorti authored Nov 9, 2024
2 parents 0ffc78a + f201c91 commit 65e0964
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 56 deletions.
22 changes: 6 additions & 16 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
types: [opened, reopened, synchronize]
push:
branches:
- 'master'
- "master"

jobs:
test:
Expand All @@ -22,23 +22,13 @@ jobs:
strategy:
matrix:
# For details see: https://hexdocs.pm/elixir/compatibility-and-deprecations.html#compatibility-between-elixir-and-erlang-otp
elixir: ["1.17", "1.16", "1.15", "1.14", "1.13"]
otp: ["27", "26", "25", "24", "23"]
elixir: ["1.17", "1.16", "1.15", "1.14"]
otp: ["27", "26"]
exclude:
- { otp: "24", elixir: "1.17" }
- { otp: "23", elixir: "1.17" }
- { otp: "22", elixir: "1.17" }
- { otp: "27", elixir: "1.16" }
- { otp: "23", elixir: "1.16" }
- { otp: "22", elixir: "1.16" }
- { otp: "27", elixir: "1.15" }
- { otp: "23", elixir: "1.15" }
- { otp: "27", elixir: "1.14" }
- { otp: "26", elixir: "1.14" }
- { otp: "22", elixir: "1.14" }
- { otp: "27", elixir: "1.13" }
- { otp: "26", elixir: "1.13" }
- { otp: "25", elixir: "1.13" }
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand All @@ -53,8 +43,8 @@ jobs:
name: Linting
strategy:
matrix:
elixir: ['1.16']
otp: ['26']
elixir: ["1.17"]
otp: ["27"]
steps:
- uses: actions/checkout@v4
- uses: erlef/setup-beam@v1
Expand All @@ -63,7 +53,7 @@ jobs:
elixir-version: ${{ matrix.elixir }}
id: beam
- name: PLT cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
key: |
${{ runner.os }}-${{ steps.beam.outputs.elixir-version }}-${{ steps.beam.outputs.otp-version }}-plt
Expand Down
63 changes: 28 additions & 35 deletions lib/lapin/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ defmodule Lapin.Connection do
end

def init(configuration) do
{:connect, :init,
%{configuration: configuration, consumers: [], producers: [], connection: nil, module: nil}}
{
:connect,
:init,
%{configuration: configuration, consumers: [], producers: [], connection: nil, module: nil}
}
end

@doc """
Expand All @@ -153,10 +156,7 @@ defmodule Lapin.Connection do
def close(connection), do: GenServer.stop(connection)

def terminate(_reason, %{connection: nil}), do: :ok

def terminate(_reason, %{connection: connection}) do
AMQP.Connection.close(connection)
end
def terminate(_reason, %{connection: connection}), do: AMQP.Connection.close(connection)

@doc """
Publishes a message to the specified exchange with the given routing_key
Expand All @@ -168,9 +168,8 @@ defmodule Lapin.Connection do
Payload.t(),
options :: Keyword.t()
) :: on_callback
def publish(connection, exchange, routing_key, payload, options \\ []) do
Connection.call(connection, {:publish, exchange, routing_key, payload, options})
end
def publish(connection, exchange, routing_key, payload, options \\ []),
do: Connection.call(connection, {:publish, exchange, routing_key, payload, options})

def handle_call(
{:publish, _exchange, _routing_key, _payload, _options},
Expand All @@ -186,10 +185,10 @@ defmodule Lapin.Connection do
%{producers: producers, module: module} = state
) do
with {:ok, %Producer{pattern: pattern} = producer} <- Producer.get(producers, exchange),
mandatory <- pattern.mandatory(producer),
persistent <- pattern.persistent(producer),
options <- Keyword.merge([mandatory: mandatory, persistent: persistent], options),
meta <- %{content_type: Payload.content_type(payload)},
mandatory = pattern.mandatory(producer),
persistent = pattern.persistent(producer),
options = Keyword.merge([mandatory: mandatory, persistent: persistent], options),
meta = %{content_type: Payload.content_type(payload)},
{:ok, payload} <- Payload.encode(payload),
:ok <- Producer.publish(producer, exchange, routing_key, payload, options) do
message = %Message{meta: Enum.into(options, meta), payload: payload}
Expand Down Expand Up @@ -317,13 +316,13 @@ defmodule Lapin.Connection do
payload: payload
} = message
) do
with ack <- pattern.ack(consumer),
payload_for <- module.payload_for(consumer, message),
content_type <- Payload.content_type(payload_for),
meta <- Map.put(meta, :content_type, content_type),
{:ok, payload} <- Payload.decode_into(payload_for, payload),
message <- %Message{message | meta: meta, payload: payload},
:ok <- module.handle_deliver(consumer, message) do
payload_for = module.payload_for(consumer, message)

with {:ok, payload} <- Payload.decode_into(payload_for, payload),
ack = pattern.ack(consumer),
content_type = Payload.content_type(payload_for),
meta = Map.put(meta, :content_type, content_type),
:ok <- module.handle_deliver(consumer, %Message{message | meta: meta, payload: payload}) do
Logger.debug(fn -> "Consuming message #{delivery_tag}" end)
consume_ack(ack, consumer, delivery_tag)
else
Expand Down Expand Up @@ -442,21 +441,21 @@ defmodule Lapin.Connection do
defp bind_queues(queues, channel), do: Enum.each(queues, &Queue.bind(&1, channel))

defp create_producers(configuration, connection) do
producers =
{
:ok,
configuration
|> Keyword.get(:producers, [])
|> Enum.map(&Producer.create(connection, &1))

{:ok, producers}
}
end

defp create_consumers(configuration, connection) do
consumers =
{
:ok,
configuration
|> Keyword.get(:consumers, [])
|> Enum.map(&Consumer.create(connection, &1))

{:ok, consumers}
}
end

defp cleanup_configuration(configuration) do
Expand Down Expand Up @@ -527,13 +526,8 @@ defmodule Lapin.Connection do
end
end

defp map_userinfo(userinfo) when is_binary(userinfo) do
parts =
userinfo
|> String.split(":", parts: 2)

[Enum.at(parts, 0), Enum.at(parts, 1)]
end
defp map_userinfo(user_info) when is_binary(user_info),
do: String.split(user_info, ":", parts: 2)

defp map_userinfo(_), do: [nil, nil]

Expand Down Expand Up @@ -563,8 +557,7 @@ defmodule Lapin.Connection do
if Enum.all?(params, &Keyword.has_key?(configuration, &1)) do
:ok
else
missing_params = Enum.reject(params, &Keyword.has_key?(configuration, &1))
{:error, :missing_params, missing_params}
{:error, :missing_params, Enum.reject(params, &Keyword.has_key?(configuration, &1))}
end
end
end
3 changes: 2 additions & 1 deletion lib/lapin/supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
defmodule Lapin.Supervisor do
@moduledoc false
@moduledoc "Lapin Supervisor"

use Supervisor

require Logger

@spec start_link :: Supervisor.on_start()
Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"credo": {:hex, :credo, "1.7.9", "07bb31907746ae2b5e569197c9e16c0d75c8578a22f01bee63f212047efb2647", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f87c11c34ba579f7c5044f02b2a807e1ed2fa5fdbb24dc7eb4ad59c1904887f3"},
"credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"},
"dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"},
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"makeup": {:hex, :makeup, "1.1.2", "9ba8837913bdf757787e71c1581c21f9d2455f4dd04cfca785c70bbfff1a76a3", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cce1566b81fbcbd21eca8ffe808f33b221f9eee2cbc7a1706fc3da9ff18e6cac"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.2", "627e84b8e8bf22e60a2579dad15067c755531fea049ae26ef1020cad58fe9578", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "41193978704763f6bbe6cc2758b84909e62984c7752b3784bd3c218bb341706b"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.0", "6f0eff9c9c489f26b69b61440bf1b238d95badae49adac77973cbacae87e3c2e", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "ea7a9307de9d1548d2a72d299058d1fd2339e3d398560a0e46c27dab4891e4d2"},
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.0", "74bb8348c9b3a51d5c589bf5aebb0466a84b33274150e3b6ece1da45584afc82", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49159b7d7d999e836bedaf09dcf35ca18b312230cf901b725a64f3f42e407983"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.1", "c7f58c120b2b5aa5fd80d540a89fdf866ed42f1f3994e4fe189abebeab610839", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "8a89a1eeccc2d798d6ea15496a6e4870b75e014d1af514b1b71fa33134f57814"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"rabbit_common": {:hex, :rabbit_common, "4.0.3", "e927b882733d122f6802662220bdb1a83774852dbe67d21d4e33aaf54f3998dd", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:ranch, "2.1.0", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.5.6", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.2.1", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "ead31ba292c2cc5fda48a486417d7cfe8966f661994d7ed6c3e5f8840828e8ec"},
"ranch": {:hex, :ranch, "2.1.0", "2261f9ed9574dcfcc444106b9f6da155e6e540b2f82ba3d42b339b93673b72a3", [:make, :rebar3], [], "hexpm", "244ee3fa2a6175270d8e1fc59024fd9dbc76294a321057de8f803b1479e76916"},
Expand Down

0 comments on commit 65e0964

Please sign in to comment.