Skip to content

Commit

Permalink
Merge pull request #4400 from esl/fix-event_not_registered-on-shutdown
Browse files Browse the repository at this point in the history
Fix event not registered on shutdown
  • Loading branch information
chrzaszcz authored Dec 2, 2024
2 parents 2bde4e7 + 87f7bf4 commit a8aa910
Show file tree
Hide file tree
Showing 17 changed files with 301 additions and 70 deletions.
1 change: 1 addition & 0 deletions big_tests/default.spec
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
{suites, "tests", cets_disco_SUITE}.
{suites, "tests", start_node_id_SUITE}.
{suites, "tests", tr_util_SUITE}.
{suites, "tests", shutdown_SUITE}.

{config, ["test.config"]}.
{logdir, "ct_report"}.
Expand Down
1 change: 1 addition & 0 deletions big_tests/dynamic_domains.spec
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
{suites, "tests", cets_disco_SUITE}.
{suites, "tests", start_node_id_SUITE}.
{suites, "tests", tr_util_SUITE}.
{suites, "tests", shutdown_SUITE}.

{config, ["dynamic_domains.config", "test.config"]}.

Expand Down
8 changes: 8 additions & 0 deletions big_tests/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,13 @@
{server, <<"localhost">>},
{password, <<"password">>},
{port, 5252}]},
{adam, [ %% used in mod_global_distrib_SUITE, websockets
{username, <<"adam">>},
{server, <<"localhost">>},
{password, <<"password">>},
{transport, escalus_ws},
{port, 5272},
{wspath, <<"/ws-xmpp">>}]},
{neustradamus, [
{username, <<"neustradamus">>},
{server, <<"localhost">>},
Expand Down Expand Up @@ -348,6 +355,7 @@
scope = \"global\"
workers = 10
strategy = \"random_worker\"
connection.database = {{redis_database_number}}
[outgoing_pools.rdbms.default]
scope = \"global\"
workers = 5
Expand Down
46 changes: 39 additions & 7 deletions big_tests/tests/mod_global_distrib_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -688,19 +688,26 @@ test_component_disconnect(Config) ->
test_location_disconnect(Config) ->
try
escalus:fresh_story(
Config, [{alice, 1}, {eve, 1}],
fun(Alice, Eve) ->
Config, [{alice, 1}, {eve, 1}, {adam, 1}],
fun(Alice, Eve, Adam) ->
escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),

escalus_client:wait_for_stanza(Eve),

escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, from Europe1!">>)),
escalus_client:wait_for_stanza(Adam),

print_sessions_debug_info(asia_node),
ok = rpc(asia_node, application, stop, [mongooseim]),
%% TODO: Stopping mongooseim alone should probably stop connections too
ok = rpc(asia_node, application, stop, [ranch]),

escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
Error = escalus:wait_for_stanza(Alice),
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error),

escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, again!">>)),
Error2 = escalus:wait_for_stanza(Alice),
escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error2)
end)
after
rpc(asia_node, application, start, [ranch]),
Expand Down Expand Up @@ -1410,6 +1417,27 @@ can_connect_to_port(Port) ->
false
end.

%% Prints information about the active sessions
print_sessions_debug_info(NodeName) ->
Node = rpc(NodeName, erlang, node, []),
Nodes = rpc(NodeName, erlang, nodes, []),
ct:log("name=~p, erlang_node=~p, other_nodes=~p", [NodeName, Node, Nodes]),

Children = rpc(NodeName, supervisor, which_children, [mongoose_c2s_sup]),
ct:log("C2S processes under a supervisour ~p", [Children]),

Sessions = rpc(NodeName, ejabberd_sm, get_full_session_list, []),
ct:log("C2S processes in the session manager ~p", [Sessions]),

Sids = [element(2, Session) || Session <- Sessions],
Pids = [Pid || {_, Pid} <- Sids],
PidNodes = [{Pid, node(Pid)} || Pid <- Pids],
ct:log("Pids on nodes ~p", [PidNodes]),

Info = [{Pid, rpc:call(Node, erlang, process_info, [Pid])} || {Pid, Node} <- PidNodes],
ct:log("Processes info ~p", [Info]),
ok.

%% -----------------------------------------------------------------------
%% Custom log levels for GD modules during the tests

Expand All @@ -1433,10 +1461,14 @@ custom_loglevels() ->
{mod_global_distrib_connection, debug},
%% to check if gc or refresh is triggered
{mod_global_distrib_server_mgr, info},
%% To debug incoming connections
%% To debug incoming connections
% {mod_global_distrib_receiver, info},
%% to debug global session set/delete
{mod_global_distrib_mapping, debug}
%% to debug global session set/delete
{mod_global_distrib_mapping, debug},
%% To log make_error_reply calls
{jlib, debug},
%% to log sm_route
{ejabberd_sm, debug}
].

test_hosts() -> [mim, mim2, reg].
139 changes: 139 additions & 0 deletions big_tests/tests/shutdown_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
-module(shutdown_SUITE).

-compile([export_all, nowarn_export_all]).
-import(distributed_helper, [mim/0, rpc/4]).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("exml/include/exml.hrl").

all() ->
[{group, main}].

groups() ->
[{main, [], cases()}].

cases() ->
[shutdown,
client_tries_to_connect_before_listener_stop].

init_per_suite(Config) ->
mongoose_helper:inject_module(?MODULE),
escalus:init_per_suite(Config).

end_per_suite(Config) ->
escalus:end_per_suite(Config).

init_per_group(_, Config) ->
escalus:create_users(Config, escalus:get_users([geralt_s, alice])),
Config.

end_per_group(_, Config) ->
escalus:delete_users(Config, escalus:get_users([geralt_s, alice])),
Config.

init_per_testcase(shutdown = TC, Config) ->
logger_ct_backend:start(),
escalus:init_per_testcase(TC, Config);
init_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) ->
escalus:init_per_testcase(TC, Config);
init_per_testcase(Name, Config) ->
escalus:init_per_testcase(Name, Config).

end_per_testcase(shutdown = TC, Config) ->
logger_ct_backend:stop(),
escalus:end_per_testcase(TC, Config);
end_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) ->
rpc(mim(), meck, unload, []),
escalus:end_per_testcase(TC, Config);
end_per_testcase(Name, Config) ->
escalus:end_per_testcase(Name, Config).

shutdown(Config) ->
UserSpec = escalus_users:get_userspec(Config, geralt_s),
{ok, Alice, _} = escalus_connection:start(UserSpec),
logger_ct_backend:capture(error),
ejabberd_node_utils:restart_application(mongooseim),
logger_ct_backend:stop_capture(),
FilterFun = fun(_, Msg) -> re:run(Msg, "event_not_registered") /= nomatch end,
[] = logger_ct_backend:recv(FilterFun),
%% Ensure that Alice gets a shutdown stanza
escalus:assert(is_stream_error, [<<"system-shutdown">>, <<>>],
escalus_client:wait_for_stanza(Alice)),
escalus:assert(is_stream_end, escalus_client:wait_for_stanza(Alice)),
true = escalus_connection:wait_for_close(Alice, timer:seconds(1)).

client_tries_to_connect_before_listener_stop(Config) ->
%% Ensures that user would not be able to connect while we are stopping
%% the listeners and other c2s processes
UserSpec = escalus_users:get_userspec(Config, geralt_s),
PortNum = proplists:get_value(port, UserSpec),
%% Ask MongooseIM to pause the stopping process
%% so we can check that listeners were suspended correctly
block_listener(),
%% Check that the listener is working
{ok, ConnPort} = gen_tcp:connect("127.0.0.1", PortNum, []),
%% Trigger the restarting logic in a separate parallel process
RestPid = restart_application_non_blocking(),
%% Wait until we are blocked in mongoose_listener:stop/0
Called = wait_for_called(),
{error, econnrefused} = gen_tcp:connect("127.0.0.1", PortNum, []),
%% Resume to stop the listeners
resume(Called),
%% Check that the old TCP connections are closed
receive_tcp_closed(ConnPort),
%% Wait till mongooseim is fully restarted
wait_for_down(RestPid).

block_listener() ->
rpc(mim(), meck, new, [mongoose_listener, [no_link, passthrough]]),
Pid = self(),
F = fun() ->
wait_for_resume(Pid),
meck:passthrough([])
end,
rpc(mim(), meck, expect, [mongoose_listener, stop, F]).

restart_application_non_blocking() ->
spawn_link(fun() -> ejabberd_node_utils:restart_application(mongooseim) end).

wait_for_called() ->
receive
{called, Called} ->
Called
after 5000 ->
error(wait_for_called_timeout)
end.

%% Blocks the mocked process until resume/1 is called
wait_for_resume(Pid) ->
MonRef = erlang:monitor(process, Pid),
Called = {self(), MonRef},
Pid ! {called, Called},
receive
{'DOWN', MonRef, process, _, _} -> ok;
{resume, MonRef} -> ok
end.

%% Command to the mocked process to resume the operation
resume({Pid, MonRef}) ->
Pid ! {resume, MonRef}.

wait_for_down(Pid) ->
MonRef = erlang:monitor(process, Pid),
receive
{'DOWN', MonRef, process, _, _} -> ok
after 5000 ->
ct:pal("wait_for_down current_stacktrace ~p",
[rpc:pinfo(Pid, current_stacktrace)]),
ct:fail(wait_for_down_timeout)
end.

%% Waits until the TCP socket is closed
receive_tcp_closed(ConnPort) ->
receive
{tcp_closed, ConnPort} ->
ok
after 5000 ->
ct:fail(wait_for_tcp_close_timeout)
end.
2 changes: 1 addition & 1 deletion include/safely.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
-define(SAFELY(F),
try F catch
error:R:S -> {exception, #{class => error, reason => R, stacktrace => S}};
throw:R -> {exception, #{class => throw, reason => R}};
throw:R:S -> {exception, #{class => throw, reason => R, stacktrace => S}};
exit:R:S -> {exception, #{class => exit, reason => R, stacktrace => S}}
end).

Expand Down
1 change: 1 addition & 0 deletions rel/fed1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
{hosts, "\"fed1\", \"fed2\""}.
{default_server_domain, "\"fed1\""}.
{cluster_name, "fed"}.
{redis_database_number, "2"}.

%% domain.example.com is for multitenancy preset, muc_SUITE:register_over_s2s
{s2s_addr, "[[s2s.address]]
Expand Down
1 change: 1 addition & 0 deletions rel/mim1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
{host_types, "\"test type\", \"dummy auth\", \"anonymous\""}.
{default_server_domain, "\"localhost\""}.
{cluster_name, "mim"}.
{redis_database_number, "0"}.

{mod_amp, ""}.
{host_config,
Expand Down
1 change: 1 addition & 0 deletions rel/mim2.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
{host_types, "\"test type\", \"dummy auth\""}.
{default_server_domain, "\"localhost\""}.
{cluster_name, "mim"}.
{redis_database_number, "0"}.
{s2s_addr, "[[s2s.address]]
host = \"localhost2\"
ip_address = \"127.0.0.1\""}.
Expand Down
1 change: 1 addition & 0 deletions rel/mim3.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
{hosts, "\"localhost\", \"anonymous.localhost\", \"localhost.bis\""}.
{default_server_domain, "\"localhost\""}.
{cluster_name, "mim"}.
{redis_database_number, "0"}.

{s2s_addr, "[[s2s.address]]
host = \"localhost2\"
Expand Down
2 changes: 2 additions & 0 deletions rel/reg1.vars-toml.config
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
{hosts, "\"reg1\", \"localhost\""}.
{default_server_domain, "\"reg1\""}.
{cluster_name, "reg"}.
{cluster_name, "reg"}.
{redis_database_number, "1"}.

{s2s_addr, "[[s2s.address]]
host = \"localhost\"
Expand Down
48 changes: 6 additions & 42 deletions src/ejabberd_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

-include("mongoose.hrl").


%%%
%%% Application API
%%%
Expand Down Expand Up @@ -86,65 +85,30 @@ do_start() ->
%% @doc Prepare the application for termination.
%% This function is called when an application is about to be stopped,
%% before shutting down the processes of the application.
prep_stop(State) ->
prep_stop(_State) ->
mongoose_deprecations:stop(),
broadcast_c2s_shutdown_listeners(),
StoppedCount = mongoose_listener:suspend_listeners_and_shutdown_connections(),
mongoose_listener:stop(),
mongoose_modules:stop(),
mongoose_service:stop(),
broadcast_c2s_shutdown_sup(),
mongoose_wpool:stop(),
mongoose_graphql_commands:stop(),
mongoose_router:stop(),
mongoose_system_probes:stop(),
State.
#{stopped_count => StoppedCount}.

%% All the processes were killed when this function is called
stop(_State) ->
stop(#{stopped_count := StoppedCount}) ->
mongoose_config:stop(),
?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, node => node()}),
?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION,
node => node(), stopped_sessions_count => StoppedCount}),
delete_pid_file(),
update_status_file(stopped),
%% We cannot stop other applications inside of the stop callback
%% (because we would deadlock the application controller process).
%% That is why we call mnesia:stop() inside of db_init_mnesia() instead.
ok.

%%%
%%% Internal functions
%%%

-spec broadcast_c2s_shutdown_listeners() -> ok.
broadcast_c2s_shutdown_listeners() ->
Children = supervisor:which_children(mongoose_listener_sup),
Listeners = [Ref || {Ref, _, _, [mongoose_c2s_listener]} <- Children],
lists:foreach(
fun(Listener) ->
ranch:suspend_listener(Listener),
[mongoose_c2s:exit(Pid, system_shutdown) || Pid <- ranch:procs(Listener, connections)],
mongoose_lib:wait_until(
fun() ->
length(ranch:procs(Listener, connections))
end,
0)
end,
Listeners).

-spec broadcast_c2s_shutdown_sup() -> ok.
broadcast_c2s_shutdown_sup() ->
Children = supervisor:which_children(mongoose_c2s_sup),
lists:foreach(
fun({_, Pid, _, _}) ->
mongoose_c2s:exit(Pid, system_shutdown)
end,
Children),
mongoose_lib:wait_until(
fun() ->
Res = supervisor:count_children(mongoose_c2s_sup),
proplists:get_value(active, Res)
end,
0).

%%%
%%% PID file
%%%
Expand Down
3 changes: 2 additions & 1 deletion src/ejabberd_sm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ do_route(Acc, From, To, El) ->
do_route_offline(Name, mongoose_acc:stanza_type(Acc),
From, To, Acc, El);
Pid when is_pid(Pid) ->
?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid, acc => Acc}),
?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid,
session_node => node(Pid), acc => Acc}),
mongoose_c2s:route(Pid, Acc),
Acc
end
Expand Down
Loading

0 comments on commit a8aa910

Please sign in to comment.