Skip to content

Commit

Permalink
feat: changed how much time channel remains in waiting state (#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabheadz authored Dec 12, 2024
1 parent 8b24404 commit 7152ca3
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 35 deletions.
22 changes: 20 additions & 2 deletions channel-sender/config/config-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ channel_sender_ex:
# before closing the channel
socket_idle_timeout: 30000

# Specifies the maximum time (in milliseconds) that the supervisor waits
# for child processes to terminate after sending it an exit signal (:shutdown).
# Specifies the maximum time (in milliseconds) that the Elixir supervisor waits
# for child channel processes to terminate after sending it an exit signal
# (:shutdown). This time is used by all gen_statem processes to perform clean up
# operations before shutting down.
channel_shutdown_tolerance: 10000

# Specifies the maximum drift time (in seconds) the channel process
Expand All @@ -42,6 +44,22 @@ channel_sender_ex:
# received by sender while on waiting state (no socket connection)
max_pending_queue: 100

# channel_shutdown_socket_disconnect: Defines the waiting time of the channel process
# after a socket disconnection, in case the client re-connects. The disconection can be
# clean or unclean. The channel process will wait for the client to re-connect before
#
# on_clean_close: time in seconds to wait before shutting down the channel process when a
# client explicitlly ends the socket connection (clean close). A value of 0, will
# terminate the channel process immediately. This value should never be greater than max_age.
#
# on_disconnection: time in seconds to wait before shutting down the channel process when the
# connectin between the client and server accidentally or unintendedlly is interrupted.
# A value of 0, will terminate the channel process immediately. This value should never
# be greater than max_age.
channel_shutdown_socket_disconnect:
on_clean_close: 30
on_disconnection: 300

no_start: false
topology:
strategy: Elixir.Cluster.Strategy.Gossip # for local development
Expand Down
22 changes: 15 additions & 7 deletions channel-sender/docs/channel-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,26 @@ module for the implementation.
```mermaid
stateDiagram
[*] --> Waiting: POST /ext/channel<br>[channel process is created]
Waiting --> Connected: Socket connection<br>created
Connected --> Waiting: Socket connection <br> closed
Waiting --> [*]: Socket (re)connection<br> window expires<br>[channel process terminated]
Waiting --> Connected: Socket connected/re-connected
Connected --> Waiting: Socket connection <br> closed or interrupted
Waiting --> [*]: Socket connection<br> window expires<br>[channel process terminated]
```

### Steps

1. When the client (web or mobile) calls the registration endpoint `POST /ext/channel` a new channel process is started
with its initial state **waiting**.
with its initial state `waiting`.
2. When the client (web or mobile) opens a socket connection with the server, internally the process state change to
**connected**.
`connected`.
3. If the connection between client and server is interrupted, the process returns to `waiting` state.

The `waiting` state has a timer control, which expects a socket connection (or re-connection) within the time window
defined by the `:max_age` parameter in the configuration (unit time is seconds). If no socket connection is made,
within this window the process is stopped.
defined by the `channel_shutdown_socket_disconnect` parameter in the configuration yaml (unit time is seconds) in order to return to the `connected` state.

If socket the disconnection was clean (the client requested to close), then the process uses the time defined in te
configuration `channel_shutdown_socket_disconnect.on_clean_close` (default 30 seconds). If this parameter is zero (0)
the process is terminated inmediatlly.

If socket disconection wasn't clean (the client and server lost connection), then the process uses the time defined in te
configuration `channel_shutdown_socket_disconnect.on_disconnection` (default 300 seconds). If this parameter is zero (0)
the process is terminated inmediatlly.
7 changes: 7 additions & 0 deletions channel-sender/lib/channel_sender_ex/application_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ defmodule ChannelSenderEx.ApplicationConfig do
Map.get(fetch(config, :channel_sender_ex), "max_pending_queue", 100)
)

channel_wait_times = Map.get(fetch(config, :channel_sender_ex),
"channel_shutdown_socket_disconnect", %{"on_clean_close" => 30, "on_disconnection" => 300})
Application.put_env(:channel_sender_ex, :channel_shutdown_on_clean_close,
Map.get(channel_wait_times, "on_clean_close", 30))
Application.put_env(:channel_sender_ex, :channel_shutdown_on_disconnection,
Map.get(channel_wait_times, "on_disconnection", 300))

Application.put_env(:channel_sender_ex, :topology, parse_libcluster_topology(config))

if config == %{} do
Expand Down
81 changes: 64 additions & 17 deletions channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ defmodule ChannelSenderEx.Core.Channel do
@type t() :: %ChannelSenderEx.Core.Channel.Data{
channel: String.t(),
application: String.t(),
stop_cause: atom(),
socket: {pid(), reference()},
pending_ack: ChannelSenderEx.Core.Channel.pending_ack(),
pending_sending: ChannelSenderEx.Core.Channel.pending_sending(),
stop_cause: atom(),
socket_stop_cause: atom(),
user_ref: String.t()
}

Expand All @@ -37,6 +38,7 @@ defmodule ChannelSenderEx.Core.Channel do
pending_ack: BoundedMap.new(),
pending_sending: BoundedMap.new(),
stop_cause: nil,
socket_stop_cause: nil,
user_ref: ""

def new(channel, application, user_ref) do
Expand All @@ -47,6 +49,7 @@ defmodule ChannelSenderEx.Core.Channel do
pending_ack: BoundedMap.new(),
pending_sending: BoundedMap.new(),
stop_cause: nil,
socket_stop_cause: nil,
user_ref: user_ref
}
end
Expand All @@ -60,6 +63,13 @@ defmodule ChannelSenderEx.Core.Channel do
GenStateMachine.call(server, {:socket_connected, socket_pid}, timeout)
end

@doc """
operation to notify this server the reason why the socket was disconnected
"""
def socket_disconnect_reason(server, reason, timeout \\ @on_connected_channel_reply_timeout) do
GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout)
end

@doc """
operation to mark a message as acknowledged
"""
Expand Down Expand Up @@ -98,10 +108,17 @@ defmodule ChannelSenderEx.Core.Channel do
### WAITING STATE ####
### waiting state callbacks definitions ####
def waiting(:enter, _old_state, data) do
# time to wait for the socket to be authenticated
waiting_timeout = round(get_param(:max_age, 900) * 1000)
Logger.info("Channel #{data.channel} entering waiting state and expecting a socket connection and authentication. max wait time: #{waiting_timeout} ms")
{:keep_state, data, [{:state_timeout, waiting_timeout, :waiting_timeout}]}
# time to wait for the socket to be open (or re-opened) and authenticated
waiting_timeout = round(estimate_process_wait_time(data) * 1000)
case waiting_timeout do
0 ->
Logger.info("Channel #{data.channel} will not remain in waiting state due calculated wait time is 0. Stopping now.")
{:stop, :normal, data}
_ ->
Logger.info("Channel #{data.channel} entering waiting state. Expecting a socket connection/authentication. max wait time: #{waiting_timeout} ms")
new_data = %{data | socket_stop_cause: nil}
{:keep_state, new_data, [{:state_timeout, waiting_timeout, :waiting_timeout}]}
end
end

## stop the process with a timeout cause if the socket is not
Expand All @@ -113,7 +130,7 @@ defmodule ChannelSenderEx.Core.Channel do

def waiting({:call, from}, {:socket_connected, socket_pid}, data) do
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}}
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

actions = [
_reply = {:reply, from, :ok}
Expand Down Expand Up @@ -173,9 +190,9 @@ defmodule ChannelSenderEx.Core.Channel do
:keep_state_and_data
end

################### END######################
### WAITING STATE ####
############################################
#############################################
### CONNECTED STATE ####
#############################################

@type call() :: {:call, GenServer.from()}
@type state_return() :: :gen_statem.event_handler_result(Data.t())
Expand All @@ -186,6 +203,18 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]}
end

# this method will be called when the socket is disconnected
# to inform this process about the disconnection reason
# this will be later used to define if this process will go back to the waiting state
# or if it will stop with a specific cause
def connected({:call, from}, {:socket_disconnected_reason, reason}, data) do
new_data = %{data | socket_stop_cause: reason}
actions = [
_reply = {:reply, from, :ok}
]
{:keep_state, new_data, actions}
end

def connected(:state_timeout, :refresh_token_timeout, data) do
refresh_timeout = calculate_refresh_token_timeout()
message = new_token_message(data)
Expand Down Expand Up @@ -259,7 +288,7 @@ defmodule ChannelSenderEx.Core.Channel do

# reschedule the timer to keep retrying to deliver the message
next_delay = round(exp_back_off(get_param(:initial_redelivery_time, 900), 3_000, retries, 0.2))
Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms")
Logger.debug("Channel #{data.channel} redelivering message in #{next_delay} ms (retry #{retries})")
actions = [
_timeout =
{{:timeout, {:redelivery, ref}}, next_delay, retries + 1}
Expand All @@ -269,16 +298,12 @@ defmodule ChannelSenderEx.Core.Channel do
end
end

## Handle the case when the socket is disconnected. This method is called because the socket is monitored.
## Handle info notification when socket process terminates. This method is called because the socket is monitored.
## via Process.monitor(socket_pid) in the waited/connected state.
def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do
new_data = %{data | socket: nil}
actions = []

Logger.warning("Channel #{data.channel} detected socket disconnection, entering :waiting state")

# returns to the waiting state
{:next_state, :waiting, new_data, actions}
Logger.warning("Channel #{data.channel} detected socket close/disconnection. Will enter :waiting state")
{:next_state, :waiting, new_data, []}
end

# test this scenario and register a callback to receive twins_last_letter in connected state
Expand All @@ -304,6 +329,10 @@ defmodule ChannelSenderEx.Core.Channel do
ProtocolMessage.of(UUID.uuid4(:hex), ":n_token", new_token)
end

#########################################
### Support functions ####
#########################################

@compile {:inline, send_message: 2}
defp send_message(%{socket: {socket_pid, _}}, message) do
# creates message to the expected format
Expand Down Expand Up @@ -366,6 +395,24 @@ defmodule ChannelSenderEx.Core.Channel do
round(max(min_timeout, token_validity - tolerance) * 1000)
end

defp estimate_process_wait_time(data) do
# when is a new socket connection this will resolve false
case socket_clean_disconnection?(data) do
true ->
get_param(:channel_shutdown_on_clean_close, 30)
false ->
# this time will also apply when socket the first time connected
get_param(:channel_shutdown_on_disconnection, 300)
end
end

defp socket_clean_disconnection?(data) do
case data.socket_stop_cause do
{:remote, 1000, _} -> true
_ -> false
end
end

defp get_param(param, def) do
RulesProvider.get(param)
rescue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ defmodule ChannelSenderEx.Core.PubSub.SocketEventBus do
alias ChannelSenderEx.Core.Channel
alias ChannelSenderEx.Core.ChannelRegistry

# Notify the event of a socket connection. Receiving part is the channel process.
def notify_event({:connected, channel}, socket_pid) do
connect_channel(channel, socket_pid)
end

# Notify the event with the reason of a socket disconnection. Receiving part is
# the channel process. This will be used to determine the time the process
# will be waiting for the socket re-connection. Depending on configuration the
# waiting time may actually be zero and the process then shuts down inmediately.
# See config element: `channel_shutdown_socket_disconnect`
def notify_event({:socket_down_reason, channel_ref, reason}, _socket_pid) do
socket_disconnect_reason(channel_ref, reason)
end

def connect_channel(_, _, count \\ 0)
def connect_channel(_, _, 7), do: raise("No channel found")

Expand All @@ -25,4 +35,19 @@ defmodule ChannelSenderEx.Core.PubSub.SocketEventBus do
connect_channel(channel, socket_pid, count + 1)
end
end

defp socket_disconnect_reason(channel, reason) do
case look_channel(channel) do
pid when is_pid(pid) ->
Channel.socket_disconnect_reason(pid, reason)
:noproc -> :noproc
end
end

defp look_channel(channel) do
case ChannelRegistry.lookup_channel_addr(channel) do
pid when is_pid(pid) -> pid
:noproc -> :noproc
end
end
end
8 changes: 7 additions & 1 deletion channel-sender/lib/channel_sender_ex/transport/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule ChannelSenderEx.Transport.Socket do
{channel, :connected, encoder, {application, user_ref, monitor_ref}, %{}}}

:unauthorized ->
Logger.error("Socket unable to authorize connection. Error: #{@invalid_secret_code}-invalid token for channel #{channel}")
{_commands = [{:close, @invalid_secret_code, "Invalid token for channel"}],
{channel, :unauthorized}}
end
Expand Down Expand Up @@ -230,7 +231,12 @@ defmodule ChannelSenderEx.Transport.Socket do
case state do
{channel_ref, _, _, _, _} ->
Logger.warning("Socket for channel #{channel_ref} terminated with reason: #{inspect(reason)}")
:ok
socket_event_bus = get_param(:socket_event_bus, :noop)
case socket_event_bus do
:noop -> :ok
_ ->
socket_event_bus.notify_event({:socket_down_reason, channel_ref, reason}, self())
end
_ -> :ok
end
end
Expand Down
2 changes: 1 addition & 1 deletion channel-sender/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.MixProject do
def project do
[
app: :channel_sender_ex,
version: "0.1.7",
version: "0.1.8",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
deps: deps(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule ChannelSenderEx.ApplicationConfigTest do
use ExUnit.Case
import Mock

alias ChannelSenderEx.ApplicationConfig

Expand Down
Loading

0 comments on commit 7152ca3

Please sign in to comment.