diff --git a/channel-sender/config/config-local.yaml b/channel-sender/config/config-local.yaml index db73859..4a2bc69 100644 --- a/channel-sender/config/config-local.yaml +++ b/channel-sender/config/config-local.yaml @@ -4,16 +4,48 @@ channel_sender_ex: secret_generator: base: "aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc" salt: "socket auth" + # Max time (in seconds) for a token to be valid + # this parameter is also used to hold the channel genstatemachine in wainting state + # before it is closed max_age: 900 + + # initial time in seconds to wait before re-send a message not acked to a channel initial_redelivery_time: 900 + + # max time in seconds to wait the client to send the auth token + # before closing the channel socket_idle_timeout: 30000 + + # Specifies the maximum time (in milliseconds) that the supervisor waits + # for child processes to terminate after sending it an exit signal (:shutdown). channel_shutdown_tolerance: 10000 + + # Specifies the maximum drift time (in seconds) the channel process + # will consider for emiting a new secret token before the current one expires. + # Time to generate will be the greater value between (max_age / 2) and + # (max_age - min_disconnection_tolerance) min_disconnection_tolerance: 50 + on_connected_channel_reply_timeout: 2000 + + # max time a channel process will wait to perform the send operation before times out accept_channel_reply_timeout: 1000 + + # Allowed max number of unacknowledged messages per client connection + # after this limit is reached, oldes unacknowledged messages will be dropped + max_unacknowledged_queue: 100 + + # Allowed max number of retries to re-send unack'ed message to a channel + max_unacknowledged_retries: 10 + + # Allowed max number of messages pending to be sent to a channel + # received by sender while on waiting state (no socket connection) + max_pending_queue: 100 + no_start: false topology: - strategy: Elixir.Cluster.Strategy.Kubernetes + #strategy: Elixir.Cluster.Strategy.Gossip # for local development + strategy: Elixir.Cluster.Strategy.Kubernetes # for kubernetes config: mode: :hostname kubernetes_ip_lookup_mode: :pods @@ -22,6 +54,8 @@ channel_sender_ex: kubernetes_selector: "cluster=beam" namespace: "sendernm" polling_interval: 5000 + # see https://github.com/bancolombia/async-dataflow/tree/master/channel-sender/deploy_samples/k8s + # for more information about the kubernetes configuration with libcluser logger: level: debug diff --git a/channel-sender/lib/channel_sender_ex/application_config.ex b/channel-sender/lib/channel_sender_ex/application_config.ex index 1a73635..3193c91 100644 --- a/channel-sender/lib/channel_sender_ex/application_config.ex +++ b/channel-sender/lib/channel_sender_ex/application_config.ex @@ -90,6 +90,18 @@ defmodule ChannelSenderEx.ApplicationConfig do Map.get(fetch(config, :channel_sender_ex), "socket_idle_timeout", 30_000) ) + Application.put_env(:channel_sender_ex, :max_unacknowledged_retries, + Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_retries", 20) + ) + + Application.put_env(:channel_sender_ex, :max_unacknowledged_queue, + Map.get(fetch(config, :channel_sender_ex), "max_unacknowledged_queue", 100) + ) + + Application.put_env(:channel_sender_ex, :max_pending_queue, + Map.get(fetch(config, :channel_sender_ex), "max_pending_queue", 100) + ) + Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config)) if config == %{} do diff --git a/channel-sender/lib/channel_sender_ex/core/bounded_map.ex b/channel-sender/lib/channel_sender_ex/core/bounded_map.ex index a1f3f6f..80a286e 100644 --- a/channel-sender/lib/channel_sender_ex/core/bounded_map.ex +++ b/channel-sender/lib/channel_sender_ex/core/bounded_map.ex @@ -3,8 +3,6 @@ defmodule ChannelSenderEx.Core.BoundedMap do A map with a maximum size, evicting the oldest key-value pair when the limit is exceeded. """ - @max_size 100 - @type t :: {map(), list()} # Initialize a new BoundedMap @@ -12,9 +10,13 @@ defmodule ChannelSenderEx.Core.BoundedMap do def size({map, _keys}), do: map_size(map) - # Add a key-value pair, maintaining the max size limit - @spec put(t, String.t, any) :: t - def put({map, keys}, key, value) do + @doc """ + Put a key-value pair into the map. If the key already exists, update the value. + The oldest key-value pair is evicted when the size limit is exceeded. + The limit is set by the `max_size` parameter, defaulting to 100. + """ + @spec put(t, String.t, any, integer()) :: t + def put({map, keys}, key, value, max_size \\ 100) do if Map.has_key?(map, key) do # If the key already exists, update the map without changing keys {Map.put(map, key, value), keys} @@ -24,7 +26,7 @@ defmodule ChannelSenderEx.Core.BoundedMap do new_keys = [key | keys] # Enforce the size limit - if map_size(new_map) > @max_size do + if map_size(new_map) > max_size do oldest_key = List.last(new_keys) {Map.delete(new_map, oldest_key), List.delete_at(new_keys, -1)} else diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index aa51f84..72ea09e 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -8,6 +8,7 @@ defmodule ChannelSenderEx.Core.Channel do alias ChannelSenderEx.Core.ChannelIDGenerator alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.RulesProvider + import ChannelSenderEx.Core.Retry.ExponentialBackoff, only: [exp_back_off: 4] @on_connected_channel_reply_timeout 2000 @@ -72,10 +73,9 @@ defmodule ChannelSenderEx.Core.Channel do @type deliver_response :: :accepted_waiting | :accepted_connected @spec deliver_message(:gen_statem.server_ref(), ProtocolMessage.t()) :: deliver_response() def deliver_message(server, message) do - GenStateMachine.call(server, {:deliver_message, message}, Application.get_env( - :channel_sender_ex, - :accept_channel_reply_timeout - )) + GenStateMachine.call(server, {:deliver_message, message}, + get_param(:accept_channel_reply_timeout, 1_000) + ) end @spec start_link(any()) :: :gen_statem.start_ret() @@ -100,7 +100,7 @@ defmodule ChannelSenderEx.Core.Channel do def waiting(:enter, _old_state, data) do # time to wait for the socket to be authenticated waiting_timeout = round(get_param(:max_age, 900) * 1000) - Logger.debug("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms") + Logger.info("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms") {:keep_state, data, [{:state_timeout, waiting_timeout, :waiting_timeout}]} end @@ -182,7 +182,7 @@ defmodule ChannelSenderEx.Core.Channel do def connected(:enter, _old_state, data) do refresh_timeout = calculate_refresh_token_timeout() - Logger.debug("Channel #{data.channel} entering connected state") + Logger.info("Channel #{data.channel} entering connected state") {:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]} end @@ -220,7 +220,7 @@ defmodule ChannelSenderEx.Core.Channel do # 2. schedule a timer to retry the message delivery if not acknowledged in the expected time frame actions = [ _reply = {:reply, from, :accepted_connected}, - _timeout = {{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 500), 0} + _timeout = {{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 900), 0} ] new_data = @@ -246,15 +246,27 @@ defmodule ChannelSenderEx.Core.Channel do ## And it will continue to be executed until the message is acknowledged by the client. def connected({:timeout, {:redelivery, ref}}, retries, data = %{socket: {socket_pid, _}}) do {message, new_data} = retrieve_pending_ack(data, ref) - output = send(socket_pid, create_output_message(message, ref)) - # reschedule the timer to keep retrying to deliver the message - actions = [ - _timeout = - {{:timeout, {:redelivery, ref}}, get_param(:initial_redelivery_time, 500), retries + 1} - ] + max_unacknowledged_retries = get_param(:max_unacknowledged_retries, 20) + case retries do + r when r >= max_unacknowledged_retries -> + {message_id, _, _, _, _} = message + Logger.warning("Channel #{data.channel} reached max retries for message #{inspect(message_id)}") + {:keep_state, new_data} - {:keep_state, save_pending_ack(new_data, output), actions} + _ -> + output = send(socket_pid, create_output_message(message, ref)) + + # reschedule the timer to keep retrying to deliver the message + next_delay = round(exp_back_off(get_param(:initial_redelivery_time, 900), 3_000, retries, 0.2)) + Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms") + actions = [ + _timeout = + {{:timeout, {:redelivery, ref}}, next_delay, retries + 1} + ] + + {:keep_state, save_pending_ack(new_data, output), actions} + end end ## Handle the case when the socket is disconnected. This method is called because the socket is monitored. @@ -305,19 +317,23 @@ defmodule ChannelSenderEx.Core.Channel do send(socket_pid, output) end - #@spec save_pending_ack(Data.t(), output_message()) :: Data.t() @compile {:inline, save_pending_ack: 2} defp save_pending_ack(data = %{pending_ack: pending_ack}, {:deliver_msg, {_, ref}, message}) do {msg_id, _, _, _, _} = message Logger.debug("Channel #{data.channel} saving pending ack #{msg_id}") - %{data | pending_ack: BoundedMap.put(pending_ack, ref, message)} + %{data | pending_ack: BoundedMap.put(pending_ack, ref, message, get_param(:max_unacknowledged_queue, 100))} end @spec retrieve_pending_ack(Data.t(), reference()) :: {ProtocolMessage.t(), Data.t()} @compile {:inline, retrieve_pending_ack: 2} defp retrieve_pending_ack(data = %{pending_ack: pending_ack}, ref) do - {message, new_pending_ack} = BoundedMap.pop(pending_ack, ref) - {message, %{data | pending_ack: new_pending_ack}} + case BoundedMap.pop(pending_ack, ref) do + {:noop, _} -> + Logger.warning("Channel #{data.channel} received ack for unknown message ref #{inspect(ref)}") + {:noop, data} + {message, new_pending_ack} -> + {message, %{data | pending_ack: new_pending_ack}} + end end @spec save_pending_send(Data.t(), ProtocolMessage.t()) :: Data.t() @@ -327,7 +343,7 @@ defmodule ChannelSenderEx.Core.Channel do Logger.debug("Channel #{data.channel} saving pending msg #{msg_id}") %{ data - | pending_sending: BoundedMap.put(pending_sending, msg_id, message) + | pending_sending: BoundedMap.put(pending_sending, msg_id, message, get_param(:max_pending_queue, 100)) } end diff --git a/channel-sender/test/channel_sender_ex/core/bounded_map_test.exs b/channel-sender/test/channel_sender_ex/core/bounded_map_test.exs index 0c2c66a..a1011ae 100644 --- a/channel-sender/test/channel_sender_ex/core/bounded_map_test.exs +++ b/channel-sender/test/channel_sender_ex/core/bounded_map_test.exs @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.Core.BoundedMapTest do test "should save elements" do map = BoundedMap.new - |> BoundedMap.put("key1", "value1") + |> BoundedMap.put("key1", "value1", 100) |> BoundedMap.put("key2", "value2") assert BoundedMap.get(map, "key1") == "value1" @@ -75,4 +75,17 @@ defmodule ChannelSenderEx.Core.BoundedMapTest do assert Map.has_key?(new_map, "key2") end + test "should allow merge" do + map = BoundedMap.new + |> BoundedMap.put("key1", "value1") + |> BoundedMap.put("key2", "value2") + + map2 = BoundedMap.new + |> BoundedMap.put("key3", "value3") + |> BoundedMap.put("key4", "value4") + + merged = BoundedMap.merge(map, map2) + assert BoundedMap.size(merged) == 4 + end + end diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs index d1f5032..ed702f6 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs @@ -70,7 +70,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do secret: secret } do - {conn, stream} = assert_connect_and_authenticate(port, channel, secret) + {conn, _stream} = assert_connect_and_authenticate(port, channel, secret) :gun.close(conn) Process.sleep(100) end diff --git a/channel-sender/test/channel_sender_ex/core/channel_test.exs b/channel-sender/test/channel_sender_ex/core/channel_test.exs index 6ba9997..bef7291 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_test.exs @@ -83,7 +83,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do message_to_send = ProtocolMessage.to_protocol_message(message) :accepted_connected = Channel.deliver_message(pid, message_to_send) assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send} - assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 200 + assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000 Process.exit(pid, :kill) end @@ -94,7 +94,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do :accepted_connected = Channel.deliver_message(pid, message_to_send) assert_receive {:deliver_msg, _from = {^pid, ref}, ^message_to_send} Channel.notify_ack(pid, ref, message.message_id) - refute_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 300 + refute_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000 Process.exit(pid, :kill) end @@ -151,7 +151,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do :accepted_connected = Channel.deliver_message(pid, message_to_send) assert_receive {:deliver_msg, _from = {^pid, ref}, ^message_to_send} # Receive retry - assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 150 + assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000 # Late ack Channel.notify_ack(pid, ref, message.message_id) @@ -176,14 +176,14 @@ defmodule ChannelSenderEx.Core.ChannelTest do assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send} send(proxy, :stop) - refute_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 350 + refute_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 1000 assert {:waiting, _data} = :sys.get_state(channel_pid) proxy = proxy_process() :ok = Channel.socket_connected(channel_pid, proxy) assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send} - assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 300 + assert_receive {:deliver_msg, _from = {^channel_pid, _ref}, ^message_to_send}, 1000 send(proxy, :stop) Process.exit(channel_pid, :kill) diff --git a/channel-sender/test/channel_sender_ex/core/node_observer_test.exs b/channel-sender/test/channel_sender_ex/core/node_observer_test.exs index 6caa981..68d2a2a 100644 --- a/channel-sender/test/channel_sender_ex/core/node_observer_test.exs +++ b/channel-sender/test/channel_sender_ex/core/node_observer_test.exs @@ -10,9 +10,9 @@ defmodule ChannelSenderEx.Core.NodeObserverTest do {:ok, _} = Application.ensure_all_started(:telemetry) - {:ok, pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique) + {:ok, _pid_registry} = Horde.Registry.start_link(name: ChannelRegistry, keys: :unique) - {:ok, pid_supervisor} = + {:ok, _pid_supervisor} = Horde.DynamicSupervisor.start_link(name: ChannelSupervisor, strategy: :one_for_one) {:ok, pid: pid} diff --git a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs index 37eb1ba..a77ea23 100644 --- a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs @@ -77,14 +77,14 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do test "Should handle bad request", %{port: port, channel: channel} do conn = bad_connect(port, channel) - assert_receive {:gun_response, ^conn, stream, :fin, 400, _}, 300 + assert_receive {:gun_response, ^conn, _stream, :fin, 400, _}, 300 :gun.close(conn) end test "Should connect to socket", %{port: port, channel: channel} do conn = connect(port, channel) - assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers}, 300 + assert_receive {:gun_upgrade, ^conn, _stream, ["websocket"], _headers}, 300 :gun.close(conn) end @@ -94,7 +94,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do end test "Should authenticate with binary protocol", %{port: port, channel: channel, secret: secret} do - {conn, stream} = assert_connect_and_authenticate(port, channel, secret, @binary) + {conn, _stream} = assert_connect_and_authenticate(port, channel, secret, @binary) :gun.close(conn) end @@ -203,7 +203,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do {app_id, user_ref} = {"App1", "User1234"} channel_ref = ChannelIDGenerator.generate_channel_id(app_id, user_ref) channel_secret = ChannelIDGenerator.generate_token(channel_ref, app_id, user_ref) - {conn, stream} = assert_reject(port, channel_ref, channel_secret) + {_conn, _stream} = assert_reject(port, channel_ref, channel_secret) end test "Should reestablish Channel link when Channel gets restarted", %{ @@ -264,13 +264,13 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do assert_receive {:gun_ws, ^conn, ^stream, data_string = {type, _string}} assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string) - assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150 + assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000 assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string) - assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150 + assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000 assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string) - assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 150 + assert_receive {:gun_ws, ^conn, ^stream, data_string = {^type, _string}}, 1000 assert {^message_id, "", "event.test", ^data, _} = decode_message(data_string) :gun.close(conn)