Skip to content

Commit

Permalink
Reverse the logic from pull to push. (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
frnmjn authored Dec 2, 2024
1 parent 9c627e2 commit dcf521f
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 49 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ and this project adheres to

---

## [6.1.1] - 2024-12-02

### Added

- ([#208](https://github.com/primait/amqpx/pull/208)) Reverse the logic for draining.
Now the application signal handler call the Amqpx.SignalHandler to trigger the drain.

---

## [6.1.0] - 2024-11-29

### Added
Expand Down Expand Up @@ -79,7 +88,8 @@ and this project adheres to
- ([#129](https://github.com/primait/amqpx/pull/)) Default binding for DLX
queues instead of wildcard

[Unreleased]: https://github.com/primait/amqpx/compare/6.1.0...HEAD
[Unreleased]: https://github.com/primait/amqpx/compare/6.1.1...HEAD
[6.1.1]: https://github.com/primait/amqpx/compare/6.1.0...6.1.1
[6.1.0]: https://github.com/primait/amqpx/compare/6.0.4...6.1.0
[6.0.4]: https://github.com/primait/amqpx/compare/6.0.3...6.0.4
[6.0.3]: https://github.com/primait/amqpx/compare/6.0.2...6.0.3
Expand Down
17 changes: 3 additions & 14 deletions lib/amqp/gen/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Amqpx.Gen.Consumer do
require Logger
use GenServer
import Amqpx.Core
alias Amqpx.{Basic, Channel}
alias Amqpx.{Basic, Channel, SignalHandler}

defstruct [
:channel,
Expand Down Expand Up @@ -252,19 +252,10 @@ defmodule Amqpx.Gen.Consumer do
state
end

@type signal_status :: :stopping | :draining | :running

@spec get_signal_status :: signal_status()
defp get_signal_status do
cond do
signal_handler().stopping?() -> :stopping
signal_handler().draining?() -> :draining
true -> :running
end
end
@type signal_status :: :running | :draining | :stopping

@spec handle_signals(signal_status(), state(), String.t()) :: {:ok | :stop, state()}
defp handle_signals(signal_status \\ get_signal_status(), state, consumer_tag)
defp handle_signals(signal_status \\ SignalHandler.get_signal_status(), state, consumer_tag)

# Close channel when we we need to stop.
defp handle_signals(:stopping, state, _) do
Expand All @@ -285,6 +276,4 @@ defmodule Amqpx.Gen.Consumer do

# No signals received run as normal
defp handle_signals(:running, state, _), do: {:ok, state}

defp signal_handler, do: Application.get_env(:amqpx, :signal_handler, Amqpx.NoSignalHandler)
end
11 changes: 10 additions & 1 deletion lib/amqp/helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule Amqpx.Helper do
end

def consumers_supervisor_configuration(handlers_conf) do
Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1()))
amqp_signal_handler() ++
Enum.map(handlers_conf, &Supervisor.child_spec({Amqpx.Gen.Consumer, &1}, id: UUID.uuid1()))
end

def producer_supervisor_configuration(producer_conf) do
Expand Down Expand Up @@ -179,4 +180,12 @@ defmodule Amqpx.Helper do
def setup_exchange(channel, %{name: name, type: type}) do
Exchange.declare(channel, name, type)
end

defp amqp_signal_handler,
do: [
%{
id: Amqpx.SignalHandler,
start: {Amqpx.SignalHandler, :start_link, []}
}
]
end
16 changes: 0 additions & 16 deletions lib/amqp/no_signal_handler.ex

This file was deleted.

46 changes: 32 additions & 14 deletions lib/amqp/signal_handler.ex
Original file line number Diff line number Diff line change
@@ -1,20 +1,38 @@
defmodule Amqpx.SignalHandler do
@moduledoc """
Signal handler behaviour is used to catch the SIGTERM signal and gracefully stop the application.
In the context of Rabbitmq, it will:
cancel the channel when we are in draining mode to stop prefetch new messages.
close the channel when we are in stopping mode to reject all the unacked messages that we did't start to consume.
This module is responsible for handling signals sent to the application.
"""
use GenServer

Check in Peano how to use it.
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

"""
@doc """
Check if the application is in draining mode.
"""
@callback draining? :: boolean
def init(_) do
{:ok, :running}
end

@doc """
Check if the application is in stopping mode.
"""
@callback stopping? :: boolean
def draining do
GenServer.call(__MODULE__, :draining)
end

def stopping do
GenServer.call(__MODULE__, :stopping)
end

def get_signal_status do
GenServer.call(__MODULE__, :get_signal_status)
end

def handle_call(:draining, _from, _state) do
{:reply, :ok, :draining}
end

def handle_call(:stopping, _from, _state) do
{:reply, :ok, :stopping}
end

def handle_call(:get_signal_status, _from, state) do
{:reply, state, state}
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Amqpx.MixProject do
[
app: :amqpx,
name: "amqpx",
version: "6.1.0",
version: "6.1.1",
elixir: "~> 1.14",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :production,
Expand Down
7 changes: 5 additions & 2 deletions test/gen_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Amqpx.Test.AmqpxTest do
alias Amqpx.Test.Support.Producer3
alias Amqpx.Test.Support.ProducerWithRetry
alias Amqpx.Test.Support.ProducerConnectionTwo
alias Amqpx.SignalHandler

import Mock

Expand Down Expand Up @@ -530,7 +531,7 @@ defmodule Amqpx.Test.AmqpxTest do
payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: fn -> true end},
{SignalHandler, [], get_signal_status: fn -> :stopping end},
{Consumer1, [], []}
] do
Producer1.send_payload(payload)
Expand All @@ -547,7 +548,7 @@ defmodule Amqpx.Test.AmqpxTest do
payload = %{test: 1}

with_mocks [
{Amqpx.NoSignalHandler, [], stopping?: [in_series([], [false, true])], draining?: fn -> true end},
{SignalHandler, [], get_signal_status: [in_series([], [:draining, :stopping])]},
{Consumer1, [], [handle_message: fn _, _, state -> {:ok, state} end]}
] do
Producer1.send_payload(payload)
Expand Down Expand Up @@ -640,6 +641,8 @@ defmodule Amqpx.Test.AmqpxTest do
|> Application.fetch_env!(:consumers)
|> Enum.find(&(&1.handler_module == name))

SignalHandler.start_link()

if is_nil(opts) do
raise "Consumer #{name} not found"
end
Expand Down

0 comments on commit dcf521f

Please sign in to comment.