diff --git a/channel-sender/config/config-local.yaml b/channel-sender/config/config-local.yaml
index f0a6fa4..59f8569 100644
--- a/channel-sender/config/config-local.yaml
+++ b/channel-sender/config/config-local.yaml
@@ -16,8 +16,10 @@ channel_sender_ex:
# before closing the channel
socket_idle_timeout: 30000
- # Specifies the maximum time (in milliseconds) that the supervisor waits
- # for child processes to terminate after sending it an exit signal (:shutdown).
+ # Specifies the maximum time (in milliseconds) that the Elixir supervisor waits
+ # for child channel processes to terminate after sending it an exit signal
+ # (:shutdown). This time is used by all gen_statem processes to perform clean up
+ # operations before shutting down.
channel_shutdown_tolerance: 10000
# Specifies the maximum drift time (in seconds) the channel process
@@ -42,6 +44,22 @@ channel_sender_ex:
# received by sender while on waiting state (no socket connection)
max_pending_queue: 100
+ # channel_shutdown_socket_disconnect: Defines the waiting time of the channel process
+ # after a socket disconnection, in case the client re-connects. The disconection can be
+ # clean or unclean. The channel process will wait for the client to re-connect before
+ #
+ # on_clean_close: time in seconds to wait before shutting down the channel process when a
+ # client explicitlly ends the socket connection (clean close). A value of 0, will
+ # terminate the channel process immediately. This value should never be greater than max_age.
+ #
+ # on_disconnection: time in seconds to wait before shutting down the channel process when the
+ # connectin between the client and server accidentally or unintendedlly is interrupted.
+ # A value of 0, will terminate the channel process immediately. This value should never
+ # be greater than max_age.
+ channel_shutdown_socket_disconnect:
+ on_clean_close: 30
+ on_disconnection: 300
+
no_start: false
topology:
strategy: Elixir.Cluster.Strategy.Gossip # for local development
diff --git a/channel-sender/docs/channel-state.md b/channel-sender/docs/channel-state.md
index 6c03f2d..69924d4 100644
--- a/channel-sender/docs/channel-state.md
+++ b/channel-sender/docs/channel-state.md
@@ -10,18 +10,26 @@ module for the implementation.
```mermaid
stateDiagram
[*] --> Waiting: POST /ext/channel
[channel process is created]
-Waiting --> Connected: Socket connection
created
-Connected --> Waiting: Socket connection
closed
-Waiting --> [*]: Socket (re)connection
window expires
[channel process terminated]
+Waiting --> Connected: Socket connected/re-connected
+Connected --> Waiting: Socket connection
closed or interrupted
+Waiting --> [*]: Socket connection
window expires
[channel process terminated]
```
### Steps
1. When the client (web or mobile) calls the registration endpoint `POST /ext/channel` a new channel process is started
- with its initial state **waiting**.
+ with its initial state `waiting`.
2. When the client (web or mobile) opens a socket connection with the server, internally the process state change to
- **connected**.
+ `connected`.
3. If the connection between client and server is interrupted, the process returns to `waiting` state.
+
The `waiting` state has a timer control, which expects a socket connection (or re-connection) within the time window
- defined by the `:max_age` parameter in the configuration (unit time is seconds). If no socket connection is made,
- within this window the process is stopped.
\ No newline at end of file
+ defined by the `channel_shutdown_socket_disconnect` parameter in the configuration yaml (unit time is seconds) in order to return to the `connected` state.
+
+ If socket the disconnection was clean (the client requested to close), then the process uses the time defined in te
+ configuration `channel_shutdown_socket_disconnect.on_clean_close` (default 30 seconds). If this parameter is zero (0)
+ the process is terminated inmediatlly.
+
+ If socket disconection wasn't clean (the client and server lost connection), then the process uses the time defined in te
+ configuration `channel_shutdown_socket_disconnect.on_disconnection` (default 300 seconds). If this parameter is zero (0)
+ the process is terminated inmediatlly.
\ No newline at end of file
diff --git a/channel-sender/lib/channel_sender_ex/application_config.ex b/channel-sender/lib/channel_sender_ex/application_config.ex
index 3193c91..4fe92be 100644
--- a/channel-sender/lib/channel_sender_ex/application_config.ex
+++ b/channel-sender/lib/channel_sender_ex/application_config.ex
@@ -102,6 +102,13 @@ defmodule ChannelSenderEx.ApplicationConfig do
Map.get(fetch(config, :channel_sender_ex), "max_pending_queue", 100)
)
+ channel_wait_times = Map.get(fetch(config, :channel_sender_ex),
+ "channel_shutdown_socket_disconnect", %{"on_clean_close" => 30, "on_disconnection" => 300})
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_clean_close,
+ Map.get(channel_wait_times, "on_clean_close", 30))
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_disconnection,
+ Map.get(channel_wait_times, "on_disconnection", 300))
+
Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config))
if config == %{} do
diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex
index f6f5e12..a1d88fa 100644
--- a/channel-sender/lib/channel_sender_ex/core/channel.ex
+++ b/channel-sender/lib/channel_sender_ex/core/channel.ex
@@ -24,10 +24,11 @@ defmodule ChannelSenderEx.Core.Channel do
@type t() :: %ChannelSenderEx.Core.Channel.Data{
channel: String.t(),
application: String.t(),
- stop_cause: atom(),
socket: {pid(), reference()},
pending_ack: ChannelSenderEx.Core.Channel.pending_ack(),
pending_sending: ChannelSenderEx.Core.Channel.pending_sending(),
+ stop_cause: atom(),
+ socket_stop_cause: atom(),
user_ref: String.t()
}
@@ -37,6 +38,7 @@ defmodule ChannelSenderEx.Core.Channel do
pending_ack: BoundedMap.new(),
pending_sending: BoundedMap.new(),
stop_cause: nil,
+ socket_stop_cause: nil,
user_ref: ""
def new(channel, application, user_ref) do
@@ -47,6 +49,7 @@ defmodule ChannelSenderEx.Core.Channel do
pending_ack: BoundedMap.new(),
pending_sending: BoundedMap.new(),
stop_cause: nil,
+ socket_stop_cause: nil,
user_ref: user_ref
}
end
@@ -60,6 +63,13 @@ defmodule ChannelSenderEx.Core.Channel do
GenStateMachine.call(server, {:socket_connected, socket_pid}, timeout)
end
+ @doc """
+ operation to notify this server the reason why the socket was disconnected
+ """
+ def socket_disconnect_reason(server, reason, timeout \\ @on_connected_channel_reply_timeout) do
+ GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout)
+ end
+
@doc """
operation to mark a message as acknowledged
"""
@@ -98,10 +108,17 @@ defmodule ChannelSenderEx.Core.Channel do
### WAITING STATE ####
### waiting state callbacks definitions ####
def waiting(:enter, _old_state, data) do
- # time to wait for the socket to be authenticated
- waiting_timeout = round(get_param(:max_age, 900) * 1000)
- Logger.info("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms")
- {:keep_state, data, [{:state_timeout, waiting_timeout, :waiting_timeout}]}
+ # time to wait for the socket to be open (or re-opened) and authenticated
+ waiting_timeout = round(estimate_process_wait_time(data) * 1000)
+ case waiting_timeout do
+ 0 ->
+ Logger.info("Channel #{data.channel} will not remain in waiting state due calculated wait time is 0. Stopping now.")
+ {:stop, :normal, data}
+ _ ->
+ Logger.info("Channel #{data.channel} entering waiting state. Expecting a socket connection/authentication. max wait time: #{waiting_timeout} ms")
+ new_data = %{data | socket_stop_cause: nil}
+ {:keep_state, new_data, [{:state_timeout, waiting_timeout, :waiting_timeout}]}
+ end
end
## stop the process with a timeout cause if the socket is not
@@ -113,7 +130,7 @@ defmodule ChannelSenderEx.Core.Channel do
def waiting({:call, from}, {:socket_connected, socket_pid}, data) do
socket_ref = Process.monitor(socket_pid)
- new_data = %{data | socket: {socket_pid, socket_ref}}
+ new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}
actions = [
_reply = {:reply, from, :ok}
@@ -173,9 +190,9 @@ defmodule ChannelSenderEx.Core.Channel do
:keep_state_and_data
end
- ################### END######################
- ### WAITING STATE ####
- ############################################
+ #############################################
+ ### CONNECTED STATE ####
+ #############################################
@type call() :: {:call, GenServer.from()}
@type state_return() :: :gen_statem.event_handler_result(Data.t())
@@ -186,6 +203,18 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]}
end
+ # this method will be called when the socket is disconnected
+ # to inform this process about the disconnection reason
+ # this will be later used to define if this process will go back to the waiting state
+ # or if it will stop with a specific cause
+ def connected({:call, from}, {:socket_disconnected_reason, reason}, data) do
+ new_data = %{data | socket_stop_cause: reason}
+ actions = [
+ _reply = {:reply, from, :ok}
+ ]
+ {:keep_state, new_data, actions}
+ end
+
def connected(:state_timeout, :refresh_token_timeout, data) do
refresh_timeout = calculate_refresh_token_timeout()
message = new_token_message(data)
@@ -259,7 +288,7 @@ defmodule ChannelSenderEx.Core.Channel do
# reschedule the timer to keep retrying to deliver the message
next_delay = round(exp_back_off(get_param(:initial_redelivery_time, 900), 3_000, retries, 0.2))
- Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms")
+ Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms (retry #{retries})")
actions = [
_timeout =
{{:timeout, {:redelivery, ref}}, next_delay, retries + 1}
@@ -269,16 +298,12 @@ defmodule ChannelSenderEx.Core.Channel do
end
end
- ## Handle the case when the socket is disconnected. This method is called because the socket is monitored.
+ ## Handle info notification when socket process terminates. This method is called because the socket is monitored.
## via Process.monitor(socket_pid) in the waited/connected state.
def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do
new_data = %{data | socket: nil}
- actions = []
-
- Logger.warning("Channel #{data.channel} detected socket disconnection, entering :waiting state")
-
- # returns to the waiting state
- {:next_state, :waiting, new_data, actions}
+ Logger.warning("Channel #{data.channel} detected socket close/disconnection. Will enter :waiting state")
+ {:next_state, :waiting, new_data, []}
end
# test this scenario and register a callback to receive twins_last_letter in connected state
@@ -304,6 +329,10 @@ defmodule ChannelSenderEx.Core.Channel do
ProtocolMessage.of(UUID.uuid4(:hex), ":n_token", new_token)
end
+ #########################################
+ ### Support functions ####
+ #########################################
+
@compile {:inline, send_message: 2}
defp send_message(%{socket: {socket_pid, _}}, message) do
# creates message to the expected format
@@ -366,6 +395,24 @@ defmodule ChannelSenderEx.Core.Channel do
round(max(min_timeout, token_validity - tolerance) * 1000)
end
+ defp estimate_process_wait_time(data) do
+ # when is a new socket connection this will resolve false
+ case socket_clean_disconnection?(data) do
+ true ->
+ get_param(:channel_shutdown_on_clean_close, 30)
+ false ->
+ # this time will also apply when socket the first time connected
+ get_param(:channel_shutdown_on_disconnection, 300)
+ end
+ end
+
+ defp socket_clean_disconnection?(data) do
+ case data.socket_stop_cause do
+ {:remote, 1000, _} -> true
+ _ -> false
+ end
+ end
+
defp get_param(param, def) do
RulesProvider.get(param)
rescue
diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex
index aa4bac6..0bf96ab 100644
--- a/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex
+++ b/channel-sender/lib/channel_sender_ex/core/pubsub/socket_event_bus.ex
@@ -6,10 +6,20 @@ defmodule ChannelSenderEx.Core.PubSub.SocketEventBus do
alias ChannelSenderEx.Core.Channel
alias ChannelSenderEx.Core.ChannelRegistry
+ # Notify the event of a socket connection. Receiving part is the channel process.
def notify_event({:connected, channel}, socket_pid) do
connect_channel(channel, socket_pid)
end
+ # Notify the event with the reason of a socket disconnection. Receiving part is
+ # the channel process. This will be used to determine the time the process
+ # will be waiting for the socket re-connection. Depending on configuration the
+ # waiting time may actually be zero and the process then shuts down inmediately.
+ # See config element: `channel_shutdown_socket_disconnect`
+ def notify_event({:socket_down_reason, channel_ref, reason}, _socket_pid) do
+ socket_disconnect_reason(channel_ref, reason)
+ end
+
def connect_channel(_, _, count \\ 0)
def connect_channel(_, _, 7), do: raise("No channel found")
@@ -25,4 +35,19 @@ defmodule ChannelSenderEx.Core.PubSub.SocketEventBus do
connect_channel(channel, socket_pid, count + 1)
end
end
+
+ defp socket_disconnect_reason(channel, reason) do
+ case look_channel(channel) do
+ pid when is_pid(pid) ->
+ Channel.socket_disconnect_reason(pid, reason)
+ :noproc -> :noproc
+ end
+ end
+
+ defp look_channel(channel) do
+ case ChannelRegistry.lookup_channel_addr(channel) do
+ pid when is_pid(pid) -> pid
+ :noproc -> :noproc
+ end
+ end
end
diff --git a/channel-sender/lib/channel_sender_ex/transport/socket.ex b/channel-sender/lib/channel_sender_ex/transport/socket.ex
index 8d5761d..da20d8d 100644
--- a/channel-sender/lib/channel_sender_ex/transport/socket.ex
+++ b/channel-sender/lib/channel_sender_ex/transport/socket.ex
@@ -135,6 +135,7 @@ defmodule ChannelSenderEx.Transport.Socket do
{channel, :connected, encoder, {application, user_ref, monitor_ref}, %{}}}
:unauthorized ->
+ Logger.error("Socket unable to authorize connection. Error: #{@invalid_secret_code}-invalid token for channel #{channel}")
{_commands = [{:close, @invalid_secret_code, "Invalid token for channel"}],
{channel, :unauthorized}}
end
@@ -230,7 +231,12 @@ defmodule ChannelSenderEx.Transport.Socket do
case state do
{channel_ref, _, _, _, _} ->
Logger.warning("Socket for channel #{channel_ref} terminated with reason: #{inspect(reason)}")
- :ok
+ socket_event_bus = get_param(:socket_event_bus, :noop)
+ case socket_event_bus do
+ :noop -> :ok
+ _ ->
+ socket_event_bus.notify_event({:socket_down_reason, channel_ref, reason}, self())
+ end
_ -> :ok
end
end
diff --git a/channel-sender/mix.exs b/channel-sender/mix.exs
index 443c3c6..123199a 100644
--- a/channel-sender/mix.exs
+++ b/channel-sender/mix.exs
@@ -4,7 +4,7 @@ defmodule ChannelSenderEx.MixProject do
def project do
[
app: :channel_sender_ex,
- version: "0.1.7",
+ version: "0.1.8",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
deps: deps(),
diff --git a/channel-sender/test/channel_sender_ex/application_config_test.exs b/channel-sender/test/channel_sender_ex/application_config_test.exs
index 7b467e2..ab96eef 100644
--- a/channel-sender/test/channel_sender_ex/application_config_test.exs
+++ b/channel-sender/test/channel_sender_ex/application_config_test.exs
@@ -1,6 +1,5 @@
defmodule ChannelSenderEx.ApplicationConfigTest do
use ExUnit.Case
- import Mock
alias ChannelSenderEx.ApplicationConfig
diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs
index ed702f6..f58ee14 100644
--- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs
+++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs
@@ -18,6 +18,9 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
:on_connected_channel_reply_timeout,
2000)
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_clean_close, 900)
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_disconnection, 900)
+
Application.put_env(:channel_sender_ex, :secret_base, {
"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc",
"socket auth"
@@ -57,6 +60,8 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
port = :ranch.get_port(:external_server)
on_exit(fn ->
+ Application.delete_env(:channel_sender_ex, :channel_shutdown_on_clean_close)
+ Application.delete_env(:channel_sender_ex, :channel_shutdown_on_disconnection)
:ok = :cowboy.stop_listener(:external_server)
end)
@@ -80,7 +85,6 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
channel: channel,
secret: secret
} do
-
{conn, stream} = assert_connect_and_authenticate(port, channel, secret)
assert {:accepted_connected, _, _} = deliver_message(channel)
assert_receive {:gun_ws, ^conn, ^stream, {:text, _data_string}}
@@ -89,8 +93,27 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
assert {:accepted_waiting, _, _} = deliver_message(channel)
end
+ test "Should do no waiting when connection closes clean", %{
+ port: port,
+ channel: channel,
+ secret: secret
+ } do
+ Helper.compile(:channel_sender_ex, channel_shutdown_on_clean_close: 0)
+ Helper.compile(:channel_sender_ex, channel_shutdown_on_disconnection: 0)
+ {conn, stream} = assert_connect_and_authenticate(port, channel, secret)
+ assert {:accepted_connected, _, _} = deliver_message(channel)
+ assert_receive {:gun_ws, ^conn, ^stream, {:text, _data_string}}
+
+ channel_pid = ChannelRegistry.lookup_channel_addr(channel)
+ :gun.close(conn)
+
+ Process.sleep(500)
+ assert Process.alive?(channel_pid) == false
+ end
+
test "Should not restart channel when terminated normal (Waiting timeout)" do
- Helper.compile(:channel_sender_ex, max_age: 1)
+ Helper.compile(:channel_sender_ex, channel_shutdown_on_disconnection: 1)
+
{channel, _secret} = ChannelAuthenticator.create_channel("App1", "User1234")
channel_pid = ChannelRegistry.lookup_channel_addr(channel)
@@ -126,7 +149,8 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do
assert [{pid, _}] = Horde.Registry.lookup(ChannelRegistry.via_tuple("channel_ref", :reg1))
{_, %{pending_sending: {pending_msg, _}}} = :sys.get_state(pid)
- assert %{"42" => msg1, "82" => msg2} = pending_msg
+ assert Map.get(pending_msg, "42") == msg1
+ assert Map.get(pending_msg, "82") == msg2
end
defp deliver_message(channel, message_id \\ "42") do
diff --git a/channel-sender/test/channel_sender_ex/core/channel_test.exs b/channel-sender/test/channel_sender_ex/core/channel_test.exs
index bef7291..029a868 100644
--- a/channel-sender/test/channel_sender_ex/core/channel_test.exs
+++ b/channel-sender/test/channel_sender_ex/core/channel_test.exs
@@ -1,11 +1,10 @@
Code.compiler_options(ignore_module_conflict: true)
defmodule ChannelSenderEx.Core.ChannelTest do
- use ExUnit.Case
+ use ExUnit.Case, sync: true
import Mock
alias ChannelSenderEx.Core.Channel
- alias ChannelSenderEx.Core.Channel.Data
alias ChannelSenderEx.Core.ChannelIDGenerator
alias ChannelSenderEx.Core.ProtocolMessage
alias ChannelSenderEx.Core.RulesProvider
@@ -22,6 +21,8 @@ defmodule ChannelSenderEx.Core.ChannelTest do
:on_connected_channel_reply_timeout,
2000)
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_clean_close, 900)
+ Application.put_env(:channel_sender_ex, :channel_shutdown_on_disconnection, 900)
Application.put_env(:channel_sender_ex, :secret_base, {
"aV4ZPOf7T7HX6GvbhwyBlDM8B9jfeiwi+9qkBnjXxUZXqAeTrehojWKHkV3U0kGc",
"socket auth"
@@ -37,6 +38,14 @@ defmodule ChannelSenderEx.Core.ChannelTest do
user_ref = "user234"
channel_ref = ChannelIDGenerator.generate_channel_id(app, user_ref)
+ Application.put_env(:channel_sender_ex, :max_unacknowledged_retries, 3)
+ Helper.compile(:channel_sender_ex)
+
+ on_exit(fn ->
+ Application.delete_env(:channel_sender_ex, :max_unacknowledged_retries)
+ Helper.compile(:channel_sender_ex)
+ end)
+
{:ok,
init_args: {channel_ref, app, user_ref},
message: %{
@@ -98,6 +107,26 @@ defmodule ChannelSenderEx.Core.ChannelTest do
Process.exit(pid, :kill)
end
+ test "Should not re-deliver message when ack retries is reached", %{init_args: init_args, message: message} do
+ Application.put_env(:channel_sender_ex, :max_unacknowledged_retries, 2)
+ Helper.compile(:channel_sender_ex)
+
+ {:ok, pid} = start_channel_safe(init_args)
+ :ok = Channel.socket_connected(pid, self())
+ message_to_send = ProtocolMessage.to_protocol_message(message)
+ :accepted_connected = Channel.deliver_message(pid, message_to_send)
+ assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 600
+ assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1000
+ assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 1500
+ refute_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send}, 2000
+ Process.exit(pid, :kill)
+
+ on_exit(fn ->
+ Application.delete_env(:channel_sender_ex, :max_unacknowledged_retries)
+ Helper.compile(:channel_sender_ex)
+ end)
+ end
+
test "Should send new token in correct interval", %{init_args: init_args = {channel, _, _}} do
Helper.compile(:channel_sender_ex, max_age: 2)
{:ok, pid} = start_channel_safe(init_args)
@@ -192,7 +221,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do
test "Should terminate channel when no socket connected (Waiting timeout)", %{
init_args: init_args
} do
- Helper.compile(:channel_sender_ex, max_age: 1)
+ Helper.compile(:channel_sender_ex, channel_shutdown_on_disconnection: 1)
{:ok, channel_pid} = start_channel_safe(init_args)
:sys.trace(channel_pid, true)
assert Process.alive? channel_pid