Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2E: Consistent and configurable timeouts #797

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions tools/astarte_e2e/lib/astarte_e2e/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ defmodule AstarteE2E.Client do
|> GenSocketClient.call(:wait_for_connection, :infinity)
end

def notify_startup_timeout(realm, device_id) do
via_tuple(realm, device_id)
|> GenSocketClient.call(:notify_startup_timeout, :infinity)
end

defp join_topic(transport, state) do
topic =
state
Expand Down Expand Up @@ -588,4 +593,9 @@ defmodule AstarteE2E.Client do
{:noreply, new_state}
end
end

def handle_call(:notify_startup_timeout, from, _transport, state) do
ServiceNotifier.notify_timeout()
{:reply, :ok, state}
end
end
26 changes: 25 additions & 1 deletion tools/astarte_e2e/lib/astarte_e2e/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ defmodule AstarteE2E.Config do
type: :integer,
default: 60

@envdoc """
Number of checks performed before declaring a timeout at startup.
Defaults to 1.
"""
app_env :check_limit, :astarte_e2e, :check_limit,
os_env: "E2E_CHECK_LIMIT",
type: :integer

@envdoc """
Time interval before declaring a timeout at startup (in seconds).
This option takes priority if both E2E_CHECK_LIMIT and E2E_STARTUP_TIMEOUT_SECONDS are defined.
"""
app_env :startup_timeout_s, :astarte_e2e, :startup_timeout_s,
os_env: "E2E_STARTUP_TIMEOUT_SECONDS",
type: :integer

@envdoc "The port used to expose AstarteE2E's metrics. Defaults to 4010."
app_env :port, :astarte_e2e, :port,
os_env: "E2E_PORT",
Expand Down Expand Up @@ -229,9 +245,17 @@ defmodule AstarteE2E.Config do

@spec scheduler_opts() :: scheduler_options()
def scheduler_opts do
check_interval = check_interval_s!()
timeout_s = startup_timeout_s!()

limit = check_limit!()
limit_from_timeout = timeout_s && div(timeout_s, check_interval)
allowed_retries = max(limit || limit_from_timeout || 1, 1)

[
check_interval_s: check_interval_s!(),
check_interval_s: check_interval,
check_repetitions: check_repetitions!(),
timeout: {:active, allowed_retries},
realm: realm!(),
device_id: device_id!()
]
Expand Down
59 changes: 43 additions & 16 deletions tools/astarte_e2e/lib/astarte_e2e/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#

defmodule AstarteE2E.Scheduler do
alias AstarteE2E.Utils
alias AstarteE2E.{Client, Utils}
require Logger

use GenServer, restart: :transient
Expand All @@ -37,7 +37,18 @@ defmodule AstarteE2E.Scheduler do

check_repetitions = Keyword.fetch!(opts, :check_repetitions)

state = %{check_repetitions: check_repetitions, check_interval_ms: check_interval_ms}
realm = Keyword.fetch(opts, :realm)
device_id = Keyword.fetch!(opts, :device_id)
timeout = Keyword.fetch!(opts, :timeout)

state = %{
check_repetitions: check_repetitions,
check_interval_ms: check_interval_ms,
realm: realm,
device_id: device_id,
timeout: timeout
}

Process.send_after(self(), :do_perform_check, check_interval_ms)

{:ok, state}
Expand All @@ -55,24 +66,26 @@ defmodule AstarteE2E.Scheduler do

@impl true
def handle_info(:do_perform_check, state) do
return_val =
case AstarteE2E.perform_check() do
:ok ->
handle_successful_job(state)
Process.send_after(self(), :do_perform_check, state.check_interval_ms)

{:error, :timeout} ->
handle_timed_out_job(state)
check_result = AstarteE2E.perform_check()

{:error, :not_connected} ->
{:noreply, state}
updated_state = update_in(state.timeout, &maybe_timeout(&1, state, check_result))

e ->
Logger.warn("Unhandled condition #{inspect(e)}. Pretending everything is ok.")
{:noreply, state}
end
case check_result do
:ok ->
handle_successful_job(updated_state)

Process.send_after(self(), :do_perform_check, state.check_interval_ms)
return_val
{:error, :timeout} ->
handle_timed_out_job(updated_state)

{:error, :not_connected} ->
{:noreply, updated_state}

e ->
Logger.warn("Unhandled condition #{inspect(e)}. Pretending everything is ok.")
{:noreply, updated_state}
end
end

defp handle_successful_job(state) do
Expand Down Expand Up @@ -105,6 +118,20 @@ defmodule AstarteE2E.Scheduler do
end
end

defp maybe_timeout(timeout, state, check_result) do
cond do
timeout == :inactive or check_result == :ok -> :inactive
timeout == {:active, 1} -> call_timeout_and_set_inactive(state)
{:active, x} = timeout -> {:active, x - 1}
end
end

defp call_timeout_and_set_inactive(state) do
%{realm: {:ok, realm}, device_id: device_id} = state
Client.notify_startup_timeout(realm, device_id)
:inactive
end

defp via_tuple(realm, device_id) do
{:via, Registry, {Registry.AstarteE2E, {:scheduler, realm, device_id}}}
end
Expand Down
11 changes: 8 additions & 3 deletions tools/astarte_e2e/lib/astarte_e2e/service_notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ defmodule AstarteE2E.ServiceNotifier do
:gen_statem.call(__MODULE__, :notify_service_up)
end

def notify_timeout do
:gen_statem.call(__MODULE__, :timeout)
end

defp deliver(%Bamboo.Email{} = email) do
service_notifier_config = Config.service_notifier_config()

Expand Down Expand Up @@ -82,10 +86,10 @@ defmodule AstarteE2E.ServiceNotifier do
mail_subject: mail_subject
}

{:ok, :starting, data, [{:state_timeout, 60_000, nil}]}
{:ok, :starting, data}
end

def starting(:state_timeout, _content, %{mail_subject: mail_subject} = data) do
def starting({:call, from}, :timeout, %{mail_subject: mail_subject} = data) do
reason = "Timeout at startup"

event_id = Hukai.generate("%a-%A")
Expand All @@ -107,7 +111,8 @@ defmodule AstarteE2E.ServiceNotifier do
failure_id: event_id
)

{:next_state, :service_down, updated_data}
actions = [{:reply, from, :ok}]
{:next_state, :service_down, updated_data, actions}
end

def starting({:call, from}, :notify_service_up, data) do
Expand Down