Skip to content

Commit

Permalink
feat: added limit on retries for unack'ed messages
Browse files Browse the repository at this point in the history
  • Loading branch information
gabheadz committed Dec 9, 2024
1 parent b7a4ccb commit 94bb265
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 42 deletions.
36 changes: 35 additions & 1 deletion channel-sender/config/config-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,48 @@ channel_sender_ex:
secret_generator:
base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc"
salt: "socket auth"
# Max time (in seconds) for a token to be valid
# this parameter is also used to hold the channel genstatemachine in wainting state
# before it is closed
max_age: 900

# initial time in seconds to wait before re-send a message not acked to a channel
initial_redelivery_time: 900

# max time in seconds to wait the client to send the auth token
# before closing the channel
socket_idle_timeout: 30000

# Specifies the maximum time (in milliseconds) that the supervisor waits
# for child processes to terminate after sending it an exit signal (:shutdown).
channel_shutdown_tolerance: 10000

# Specifies the maximum drift time (in seconds) the channel process
# will consider for emiting a new secret token before the current one expires.
# Time to generate will be the greater value between (max_age / 2) and
# (max_age - min_disconnection_tolerance)
min_disconnection_tolerance: 50

on_connected_channel_reply_timeout: 2000

# max time a channel process will wait to perform the send operation before times out
accept_channel_reply_timeout: 1000

# Allowed max number of unacknowledged messages per client connection
# after this limit is reached, oldes unacknowledged messages will be dropped
max_unacknowledged_queue: 100

# Allowed max number of retries to re-send unack'ed message to a channel
max_unacknowledged_retries: 10

# Allowed max number of messages pending to be sent to a channel
# received by sender while on waiting state (no socket connection)
max_pending_queue: 100

no_start: false
topology:
strategy: Elixir.Cluster.Strategy.Kubernetes
#strategy: Elixir.Cluster.Strategy.Gossip # for local development
strategy: Elixir.Cluster.Strategy.Kubernetes # for kubernetes
config:
mode: :hostname
kubernetes_ip_lookup_mode: :pods
Expand All @@ -22,6 +54,8 @@ channel_sender_ex:
kubernetes_selector: "cluster=beam"
namespace: "sendernm"
polling_interval: 5000
# see https://github.com/bancolombia/async-dataflow/tree/master/channel-sender/deploy_samples/k8s
# for more information about the kubernetes configuration with libcluser

logger:
level: debug
Expand Down
12 changes: 12 additions & 0 deletions channel-sender/lib/channel_sender_ex/application_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ defmodule ChannelSenderEx.ApplicationConfig do
Map.get(fetch(config, :channel_sender_ex), "socket_idle_timeout", 30_000)
)

Application.put_env(:channel_sender_ex, :max_unacknowledged_retries,
Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_retries", 20)
)

Application.put_env(:channel_sender_ex, :max_unacknowledged_queue,
Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_queue", 100)
)

Application.put_env(:channel_sender_ex, :max_pending_queue,
Map.get(fetch(config, :channel_sender_ex), "max_pending_queue", 100)
)

Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config))

if config == %{} do
Expand Down
14 changes: 8 additions & 6 deletions channel-sender/lib/channel_sender_ex/core/bounded_map.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ defmodule ChannelSenderEx.Core.BoundedMap do
A map with a maximum size, evicting the oldest key-value pair when the limit is exceeded.
"""

@max_size 100

@type t :: {map(), list()}

# Initialize a new BoundedMap
def new, do: {%{}, []}

def size({map, _keys}), do: map_size(map)

# Add a key-value pair, maintaining the max size limit
@spec put(t, String.t, any) :: t
def put({map, keys}, key, value) do
@doc """
Put a key-value pair into the map. If the key already exists, update the value.
The oldest key-value pair is evicted when the size limit is exceeded.
The limit is set by the `max_size` parameter, defaulting to 100.
"""
@spec put(t, String.t, any, integer()) :: t
def put({map, keys}, key, value, max_size \\ 100) do
if Map.has_key?(map, key) do
# If the key already exists, update the map without changing keys
{Map.put(map, key, value), keys}
Expand All @@ -24,7 +26,7 @@ defmodule ChannelSenderEx.Core.BoundedMap do
new_keys = [key | keys]

# Enforce the size limit
if map_size(new_map) > @max_size do
if map_size(new_map) > max_size do
oldest_key = List.last(new_keys)
{Map.delete(new_map, oldest_key), List.delete_at(new_keys, -1)}
else
Expand Down
54 changes: 35 additions & 19 deletions channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule ChannelSenderEx.Core.Channel do
alias ChannelSenderEx.Core.ChannelIDGenerator
alias ChannelSenderEx.Core.ProtocolMessage
alias ChannelSenderEx.Core.RulesProvider
import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [exp_back_off: 4]

@on_connected_channel_reply_timeout 2000

Expand Down Expand Up @@ -72,10 +73,9 @@ defmodule ChannelSenderEx.Core.Channel do
@type deliver_response :: :accepted_waiting | :accepted_connected
@spec deliver_message(:gen_statem.server_ref(), ProtocolMessage.t()) :: deliver_response()
def deliver_message(server, message) do
GenStateMachine.call(server, {:deliver_message, message}, Application.get_env(
:channel_sender_ex,
:accept_channel_reply_timeout
))
GenStateMachine.call(server, {:deliver_message, message},
get_param(:accept_channel_reply_timeout, 1_000)
)
end

@spec start_link(any()) :: :gen_statem.start_ret()
Expand All @@ -100,7 +100,7 @@ defmodule ChannelSenderEx.Core.Channel do
def waiting(:enter, _old_state, data) do
# time to wait for the socket to be authenticated
waiting_timeout = round(get_param(:max_age, 900) * 1000)
Logger.debug("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms")
Logger.info("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms")
{:keep_state, data, [{:state_timeout, waiting_timeout, :waiting_timeout}]}
end

Expand Down Expand Up @@ -182,7 +182,7 @@ defmodule ChannelSenderEx.Core.Channel do

def connected(:enter, _old_state, data) do
refresh_timeout = calculate_refresh_token_timeout()
Logger.debug("Channel #{data.channel} entering connected state")
Logger.info("Channel #{data.channel} entering connected state")
{:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]}
end

Expand Down Expand Up @@ -220,7 +220,7 @@ defmodule ChannelSenderEx.Core.Channel do
# 2. schedule a timer to retry the message delivery if not acknowledged in the expected time frame
actions = [
_reply = {:reply, from, :accepted_connected},
_timeout = {{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 500), 0}
_timeout = {{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 900), 0}
]

new_data =
Expand All @@ -246,15 +246,27 @@ defmodule ChannelSenderEx.Core.Channel do
## And it will continue to be executed until the message is acknowledged by the client.
def connected({:timeout, {:redelivery, ref}}, retries, data = %{socket: {socket_pid, _}}) do
{message, new_data} = retrieve_pending_ack(data, ref)
output = send(socket_pid, create_output_message(message, ref))

# reschedule the timer to keep retrying to deliver the message
actions = [
_timeout =
{{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 500), retries + 1}
]
max_unacknowledged_retries = get_param(:max_unacknowledged_retries, 20)
case retries do
r when r >= max_unacknowledged_retries ->
{message_id, _, _, _, _} = message
Logger.warning("Channel #{data.channel} reached max retries for message #{inspect(message_id)}")
{:keep_state, new_data}

{:keep_state, save_pending_ack(new_data, output), actions}
_ ->
output = send(socket_pid, create_output_message(message, ref))

# reschedule the timer to keep retrying to deliver the message
next_delay = round(exp_back_off(get_param(:initial_redelivery_time, 900), 3_000, retries, 0.2))
Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms")
actions = [
_timeout =
{{:timeout, {:redelivery, ref}}, next_delay, retries + 1}
]

{:keep_state, save_pending_ack(new_data, output), actions}
end
end

## Handle the case when the socket is disconnected. This method is called because the socket is monitored.
Expand Down Expand Up @@ -305,19 +317,23 @@ defmodule ChannelSenderEx.Core.Channel do
send(socket_pid, output)
end

#@spec save_pending_ack(Data.t(), output_message()) :: Data.t()
@compile {:inline, save_pending_ack: 2}
defp save_pending_ack(data = %{pending_ack: pending_ack}, {:deliver_msg, {_, ref}, message}) do
{msg_id, _, _, _, _} = message
Logger.debug("Channel #{data.channel} saving pending ack #{msg_id}")
%{data | pending_ack: BoundedMap.put(pending_ack, ref, message)}
%{data | pending_ack: BoundedMap.put(pending_ack, ref, message, get_param(:max_unacknowledged_queue, 100))}
end

@spec retrieve_pending_ack(Data.t(), reference()) :: {ProtocolMessage.t(), Data.t()}
@compile {:inline, retrieve_pending_ack: 2}
defp retrieve_pending_ack(data = %{pending_ack: pending_ack}, ref) do
{message, new_pending_ack} = BoundedMap.pop(pending_ack, ref)
{message, %{data | pending_ack: new_pending_ack}}
case BoundedMap.pop(pending_ack, ref) do
{:noop, _} ->
Logger.warning("Channel #{data.channel} received ack for unknown message ref #{inspect(ref)}")
{:noop, data}
{message, new_pending_ack} ->
{message, %{data | pending_ack: new_pending_ack}}
end
end

@spec save_pending_send(Data.t(), ProtocolMessage.t()) :: Data.t()
Expand All @@ -327,7 +343,7 @@ defmodule ChannelSenderEx.Core.Channel do
Logger.debug("Channel #{data.channel} saving pending msg #{msg_id}")
%{
data
| pending_sending: BoundedMap.put(pending_sending, msg_id, message)
| pending_sending: BoundedMap.put(pending_sending, msg_id, message, get_param(:max_pending_queue, 100))
}
end

Expand Down
15 changes: 14 additions & 1 deletion channel-sender/test/channel_sender_ex/core/bounded_map_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.Core.BoundedMapTest do

test "should save elements" do
map = BoundedMap.new
|> BoundedMap.put("key1", "value1")
|> BoundedMap.put("key1", "value1", 100)
|> BoundedMap.put("key2", "value2")

assert BoundedMap.get(map, "key1") == "value1"
Expand Down Expand Up @@ -75,4 +75,17 @@ defmodule ChannelSenderEx.Core.BoundedMapTest do
assert Map.has_key?(new_map, "key2")
end

test "should allow merge" do
map = BoundedMap.new
|> BoundedMap.put("key1", "value1")
|> BoundedMap.put("key2", "value2")

map2 = BoundedMap.new
|> BoundedMap.put("key3", "value3")
|> BoundedMap.put("key4", "value4")

merged = BoundedMap.merge(map, map2)
assert BoundedMap.size(merged) == 4
end

end
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
secret: secret
} do

{conn, stream} = assert_connect_and_authenticate(port, channel, secret)
{conn, _stream} = assert_connect_and_authenticate(port, channel, secret)
:gun.close(conn)
Process.sleep(100)
end
Expand Down
10 changes: 5 additions & 5 deletions channel-sender/test/channel_sender_ex/core/channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do
message_to_send = ProtocolMessage.to_protocol_message(message)
:accepted_connected = Channel.deliver_message(pid, message_to_send)
assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}
assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 200
assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000
Process.exit(pid, :kill)
end

Expand All @@ -94,7 +94,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do
:accepted_connected = Channel.deliver_message(pid, message_to_send)
assert_receive {:deliver_msg, _from = {^pid, ref}, ^message_to_send}
Channel.notify_ack(pid, ref, message.message_id)
refute_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 300
refute_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000
Process.exit(pid, :kill)
end

Expand Down Expand Up @@ -151,7 +151,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do
:accepted_connected = Channel.deliver_message(pid, message_to_send)
assert_receive {:deliver_msg, _from = {^pid, ref}, ^message_to_send}
# Receive retry
assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 150
assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000

# Late ack
Channel.notify_ack(pid, ref, message.message_id)
Expand All @@ -176,14 +176,14 @@ defmodule ChannelSenderEx.Core.ChannelTest do
assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}

send(proxy, :stop)
refute_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 350
refute_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 1000
assert {:waiting, _data} = :sys.get_state(channel_pid)

proxy = proxy_process()
:ok = Channel.socket_connected(channel_pid, proxy)

assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}
assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 300
assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 1000

send(proxy, :stop)
Process.exit(channel_pid, :kill)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do

{:ok, _} = Application.ensure_all_started(:telemetry)

{:ok, pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique)
{:ok, _pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique)

{:ok, pid_supervisor} =
{:ok, _pid_supervisor} =
Horde.DynamicSupervisor.start_link(name: ChannelSupervisor, strategy: :one_for_one)

{:ok, pid: pid}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do

test "Should handle bad request", %{port: port, channel: channel} do
conn = bad_connect(port, channel)
assert_receive {:gun_response, ^conn, stream, :fin, 400, _}, 300
assert_receive {:gun_response, ^conn, _stream, :fin, 400, _}, 300

:gun.close(conn)
end

test "Should connect to socket", %{port: port, channel: channel} do
conn = connect(port, channel)
assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers}, 300
assert_receive {:gun_upgrade, ^conn, _stream, ["websocket"], _headers}, 300
:gun.close(conn)
end

Expand All @@ -94,7 +94,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do
end

test "Should authenticate with binary protocol", %{port: port, channel: channel, secret: secret} do
{conn, stream} = assert_connect_and_authenticate(port, channel, secret, @binary)
{conn, _stream} = assert_connect_and_authenticate(port, channel, secret, @binary)
:gun.close(conn)
end

Expand Down Expand Up @@ -203,7 +203,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do
{app_id, user_ref} = {"App1", "User1234"}
channel_ref = ChannelIDGenerator.generate_channel_id(app_id, user_ref)
channel_secret = ChannelIDGenerator.generate_token(channel_ref, app_id, user_ref)
{conn, stream} = assert_reject(port, channel_ref, channel_secret)
{_conn, _stream} = assert_reject(port, channel_ref, channel_secret)
end

test "Should reestablish Channel link when Channel gets restarted", %{
Expand Down Expand Up @@ -264,13 +264,13 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do
assert_receive {:gun_ws, ^conn, ^stream, data_string = {type, _string}}
assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string)

assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150
assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000
assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string)

assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150
assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000
assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string)

assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150
assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000
assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string)

:gun.close(conn)
Expand Down

0 comments on commit 94bb265

Please sign in to comment.