From f333be92fe50455fef866be922dd344cacfdff39 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 19 Nov 2024 10:00:06 +0100 Subject: [PATCH 01/13] Reproduce event_not_registered error on MongooseIM shutdown And websocket clients --- big_tests/default.spec | 1 + big_tests/dynamic_domains.spec | 1 + big_tests/tests/shutdown_SUITE.erl | 48 ++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 big_tests/tests/shutdown_SUITE.erl diff --git a/big_tests/default.spec b/big_tests/default.spec index ed2070c1aad..b88352f08d1 100644 --- a/big_tests/default.spec +++ b/big_tests/default.spec @@ -121,6 +121,7 @@ {suites, "tests", cets_disco_SUITE}. {suites, "tests", start_node_id_SUITE}. {suites, "tests", tr_util_SUITE}. +{suites, "tests", shutdown_SUITE}. {config, ["test.config"]}. {logdir, "ct_report"}. diff --git a/big_tests/dynamic_domains.spec b/big_tests/dynamic_domains.spec index c451bbc6da9..3662dcd9760 100644 --- a/big_tests/dynamic_domains.spec +++ b/big_tests/dynamic_domains.spec @@ -163,6 +163,7 @@ {suites, "tests", cets_disco_SUITE}. {suites, "tests", start_node_id_SUITE}. {suites, "tests", tr_util_SUITE}. +{suites, "tests", shutdown_SUITE}. {config, ["dynamic_domains.config", "test.config"]}. diff --git a/big_tests/tests/shutdown_SUITE.erl b/big_tests/tests/shutdown_SUITE.erl new file mode 100644 index 00000000000..cd187478f38 --- /dev/null +++ b/big_tests/tests/shutdown_SUITE.erl @@ -0,0 +1,48 @@ +-module(shutdown_SUITE). + +-compile([export_all, nowarn_export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("exml/include/exml.hrl"). + +all() -> + [{group, main}]. + +groups() -> + [{main, [parallel], [shutdown]}]. + +init_per_suite(Config) -> + escalus:init_per_suite(Config). + +end_per_suite(Config) -> + escalus:end_per_suite(Config). + +init_per_group(_, Config) -> + escalus:create_users(Config, escalus:get_users([geralt_s, alice])), + Config. + +end_per_group(_, Config) -> + escalus:delete_users(Config, escalus:get_users([geralt_s, alice])), + Config. + +init_per_testcase(shutdown = TC, Config) -> + logger_ct_backend:start(), + escalus:init_per_testcase(TC, Config); +init_per_testcase(Name, Config) -> + escalus:init_per_testcase(Name, Config). + +end_per_testcase(shutdown = TC, Config) -> + logger_ct_backend:stop(), + escalus:end_per_testcase(TC, Config); +end_per_testcase(Name, Config) -> + escalus:end_per_testcase(Name, Config). + +shutdown(Config) -> + UserSpec = escalus_users:get_userspec(Config, geralt_s), + {ok, _Alice, _} = escalus_connection:start(UserSpec), + logger_ct_backend:capture(error), + ejabberd_node_utils:restart_application(mongooseim), + logger_ct_backend:stop_capture(), + FilterFun = fun(_, WMsg) -> re:run(WMsg, "event_not_registered") /= nomatch end, + [] = logger_ct_backend:recv(FilterFun). From 31419a9b3d6f811ca2b26f5ab5634abdcc5faa13 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 19 Nov 2024 22:42:12 +0100 Subject: [PATCH 02/13] Shutdown c2s processes based on c2s supervisor data first Disallow new connections by calling suspend_listeners Call broadcast_c2s_shutdown_sup Call mongoose_listener:stop This should unify behaviour of c2s and websockets when shutting down the node --- big_tests/tests/shutdown_SUITE.erl | 85 +++++++++++++++++++++++++++++- src/ejabberd_app.erl | 25 ++++----- 2 files changed, 93 insertions(+), 17 deletions(-) diff --git a/big_tests/tests/shutdown_SUITE.erl b/big_tests/tests/shutdown_SUITE.erl index cd187478f38..36a464b478f 100644 --- a/big_tests/tests/shutdown_SUITE.erl +++ b/big_tests/tests/shutdown_SUITE.erl @@ -1,6 +1,7 @@ -module(shutdown_SUITE). -compile([export_all, nowarn_export_all]). +-import(distributed_helper, [mim/0, rpc/4]). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -10,9 +11,14 @@ all() -> [{group, main}]. groups() -> - [{main, [parallel], [shutdown]}]. + [{main, [], cases()}]. + +cases() -> + [shutdown, + client_tries_to_connect_before_listener_stop]. init_per_suite(Config) -> + mongoose_helper:inject_module(?MODULE), escalus:init_per_suite(Config). end_per_suite(Config) -> @@ -29,12 +35,17 @@ end_per_group(_, Config) -> init_per_testcase(shutdown = TC, Config) -> logger_ct_backend:start(), escalus:init_per_testcase(TC, Config); +init_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) -> + escalus:init_per_testcase(TC, Config); init_per_testcase(Name, Config) -> escalus:init_per_testcase(Name, Config). end_per_testcase(shutdown = TC, Config) -> logger_ct_backend:stop(), escalus:end_per_testcase(TC, Config); +end_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) -> + rpc(mim(), meck, unload, []), + escalus:end_per_testcase(TC, Config); end_per_testcase(Name, Config) -> escalus:end_per_testcase(Name, Config). @@ -46,3 +57,75 @@ shutdown(Config) -> logger_ct_backend:stop_capture(), FilterFun = fun(_, WMsg) -> re:run(WMsg, "event_not_registered") /= nomatch end, [] = logger_ct_backend:recv(FilterFun). + +client_tries_to_connect_before_listener_stop(Config) -> + %% Ensures that user would not be able to connect while we are stopping + %% the listeners and other c2s processes + UserSpec = escalus_users:get_userspec(Config, geralt_s), + PortNum = proplists:get_value(port, UserSpec), + %% Ask MongooseIM to pause the stopping process + %% so we can check that listeners were suspended correctly + block_listener(), + %% Check that the listener is working + {ok, ConnPort} = gen_tcp:connect("127.0.0.1", PortNum, []), + %% Trigger the restarting logic in a separate parallel process + RestPid = restart_application_non_blocking(), + %% Wait until we are blocked in mongoose_listener:stop/0 + Called = wait_for_called(), + {error, econnrefused} = gen_tcp:connect("127.0.0.1", PortNum, []), + %% Resume to stop the listeners + resume(Called), + %% Check that the old TCP connections are closed + receive_tcp_closed(ConnPort), + %% Wait till mongooseim is fully restarted + wait_for_down(RestPid). + +block_listener() -> + rpc(mim(), meck, new, [mongoose_listener, [no_link, passthrough]]), + Pid = self(), + F = fun() -> + wait_for_resume(Pid), + meck:passthrough([]) + end, + rpc(mim(), meck, expect, [mongoose_listener, stop, F]). + +restart_application_non_blocking() -> + spawn_link(fun() -> ejabberd_node_utils:restart_application(mongooseim) end). + +wait_for_called() -> + receive + {called, Called} -> + Called + after 5000 -> + error(wait_for_called_timeout) + end. + +%% Blocks the mocked process until resume/1 is called +wait_for_resume(Pid) -> + MonRef = erlang:monitor(process, Pid), + Called = {self(), MonRef}, + Pid ! {called, Called}, + receive + {'DOWN', MonRef, process, _, _} -> ok; + {resume, MonRef} -> ok + end. + +%% Command to the mocked process to resume the operation +resume({Pid, MonRef}) -> + Pid ! {resume, MonRef}. + +wait_for_down(Pid) -> + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, _, _} -> ok + after 5000 -> ct:fail(wait_for_down_timeout) + end. + +%% Waits until the TCP socket is closed +receive_tcp_closed(ConnPort) -> + receive + {tcp_closed, ConnPort} -> + ok + after 5000 -> + ct:fail(wait_for_tcp_close_timeout) + end. diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 0939a6d07de..353880ef750 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -88,11 +88,11 @@ do_start() -> %% before shutting down the processes of the application. prep_stop(State) -> mongoose_deprecations:stop(), - broadcast_c2s_shutdown_listeners(), + suspend_listeners(ranch_listeners()), + broadcast_c2s_shutdown_sup(), mongoose_listener:stop(), mongoose_modules:stop(), mongoose_service:stop(), - broadcast_c2s_shutdown_sup(), mongoose_wpool:stop(), mongoose_graphql_commands:stop(), mongoose_router:stop(), @@ -114,21 +114,14 @@ stop(_State) -> %%% Internal functions %%% --spec broadcast_c2s_shutdown_listeners() -> ok. -broadcast_c2s_shutdown_listeners() -> +suspend_listeners(Listeners) -> + lists:foreach(fun ranch:suspend_listener/1, Listeners). + +ranch_listeners() -> Children = supervisor:which_children(mongoose_listener_sup), - Listeners = [Ref || {Ref, _, _, [mongoose_c2s_listener]} <- Children], - lists:foreach( - fun(Listener) -> - ranch:suspend_listener(Listener), - [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- ranch:procs(Listener, connections)], - mongoose_lib:wait_until( - fun() -> - length(ranch:procs(Listener, connections)) - end, - 0) - end, - Listeners). + Listeners1 = [ejabberd_cowboy:ref(Listener) || {Listener, _, _, [ejabberd_cowboy]} <- Children], + Listeners2 = [Ref || {Ref, _, _, [mongoose_c2s_listener]} <- Children], + Listeners1 ++ Listeners2. -spec broadcast_c2s_shutdown_sup() -> ok. broadcast_c2s_shutdown_sup() -> From 8d032a30fa5dc0bb5db136ecfa46611d19d160da Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Tue, 19 Nov 2024 22:48:55 +0100 Subject: [PATCH 03/13] Ensure that the client gets a system-shutdown message --- big_tests/tests/shutdown_SUITE.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/big_tests/tests/shutdown_SUITE.erl b/big_tests/tests/shutdown_SUITE.erl index 36a464b478f..14977b32ccf 100644 --- a/big_tests/tests/shutdown_SUITE.erl +++ b/big_tests/tests/shutdown_SUITE.erl @@ -51,12 +51,17 @@ end_per_testcase(Name, Config) -> shutdown(Config) -> UserSpec = escalus_users:get_userspec(Config, geralt_s), - {ok, _Alice, _} = escalus_connection:start(UserSpec), + {ok, Alice, _} = escalus_connection:start(UserSpec), logger_ct_backend:capture(error), ejabberd_node_utils:restart_application(mongooseim), logger_ct_backend:stop_capture(), - FilterFun = fun(_, WMsg) -> re:run(WMsg, "event_not_registered") /= nomatch end, - [] = logger_ct_backend:recv(FilterFun). + FilterFun = fun(_, Msg) -> re:run(Msg, "event_not_registered") /= nomatch end, + [] = logger_ct_backend:recv(FilterFun), + %% Ensure that Alice gets a shutdown stanza + escalus:assert(is_stream_error, [<<"system-shutdown">>, <<>>], + escalus_client:wait_for_stanza(Alice)), + escalus:assert(is_stream_end, escalus_client:wait_for_stanza(Alice)), + true = escalus_connection:wait_for_close(Alice, timer:seconds(1)). client_tries_to_connect_before_listener_stop(Config) -> %% Ensures that user would not be able to connect while we are stopping From 285addf81735d8aeb434888ebcd29babecffb692 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:37:10 +0100 Subject: [PATCH 04/13] Add redis_database_number to separate ejabberd_sm_redis databases mod_global_distrib_SUITE:test_location_disconnect fails trying to route stanza to pid from another cluster (i.e. fed1). So, it just looses a message instead of giving 503 error --- big_tests/test.config | 1 + rel/fed1.vars-toml.config | 1 + rel/mim1.vars-toml.config | 1 + rel/mim2.vars-toml.config | 1 + rel/mim3.vars-toml.config | 1 + rel/reg1.vars-toml.config | 2 ++ 6 files changed, 7 insertions(+) diff --git a/big_tests/test.config b/big_tests/test.config index 54908859679..dc52d09b207 100644 --- a/big_tests/test.config +++ b/big_tests/test.config @@ -348,6 +348,7 @@ scope = \"global\" workers = 10 strategy = \"random_worker\" + connection.database = {{redis_database_number}} [outgoing_pools.rdbms.default] scope = \"global\" workers = 5 diff --git a/rel/fed1.vars-toml.config b/rel/fed1.vars-toml.config index 3f15efe96bc..c90682fc67c 100644 --- a/rel/fed1.vars-toml.config +++ b/rel/fed1.vars-toml.config @@ -19,6 +19,7 @@ {hosts, "\"fed1\", \"fed2\""}. {default_server_domain, "\"fed1\""}. {cluster_name, "fed"}. +{redis_database_number, "2"}. %% domain.example.com is for multitenancy preset, muc_SUITE:register_over_s2s {s2s_addr, "[[s2s.address]] diff --git a/rel/mim1.vars-toml.config b/rel/mim1.vars-toml.config index 6af5aba5bc2..ec4dd56a72b 100644 --- a/rel/mim1.vars-toml.config +++ b/rel/mim1.vars-toml.config @@ -22,6 +22,7 @@ {host_types, "\"test type\", \"dummy auth\", \"anonymous\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {mod_amp, ""}. {host_config, diff --git a/rel/mim2.vars-toml.config b/rel/mim2.vars-toml.config index e72db987632..d0b0d45f93e 100644 --- a/rel/mim2.vars-toml.config +++ b/rel/mim2.vars-toml.config @@ -20,6 +20,7 @@ {host_types, "\"test type\", \"dummy auth\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {s2s_addr, "[[s2s.address]] host = \"localhost2\" ip_address = \"127.0.0.1\""}. diff --git a/rel/mim3.vars-toml.config b/rel/mim3.vars-toml.config index a062094a919..9f14b599390 100644 --- a/rel/mim3.vars-toml.config +++ b/rel/mim3.vars-toml.config @@ -23,6 +23,7 @@ {hosts, "\"localhost\", \"anonymous.localhost\", \"localhost.bis\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {s2s_addr, "[[s2s.address]] host = \"localhost2\" diff --git a/rel/reg1.vars-toml.config b/rel/reg1.vars-toml.config index 9462854da8c..acca201257f 100644 --- a/rel/reg1.vars-toml.config +++ b/rel/reg1.vars-toml.config @@ -23,6 +23,8 @@ {hosts, "\"reg1\", \"localhost\""}. {default_server_domain, "\"reg1\""}. {cluster_name, "reg"}. +{cluster_name, "reg"}. +{redis_database_number, "1"}. {s2s_addr, "[[s2s.address]] host = \"localhost\" From 4f880376fe2adc5ea5448bd007eefdbf308fcb65 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:38:00 +0100 Subject: [PATCH 05/13] Log make_error_reply using debug log level Useful to debug when some errors happen --- src/jlib.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/jlib.erl b/src/jlib.erl index 7813c64d0fa..fa56fab7f00 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -120,14 +120,16 @@ make_result_iq_reply_attrs(Attrs) -> -spec make_error_reply(exml:element() | mongoose_acc:t(), xmlcdata() | exml:element()) -> exml:element() | {mongoose_acc:t(), exml:element() | {error, {already_an_error, _, _}}}. -make_error_reply(#xmlel{name = Name, attrs = Attrs, - children = SubTags}, Error) -> - NewAttrs = make_error_reply_attrs(Attrs), - #xmlel{name = Name, attrs = NewAttrs, children = SubTags ++ [Error]}; +make_error_reply(#xmlel{} = Elem, Error) -> + ?LOG_DEBUG(#{what => make_error_reply, + exml_packet => Elem, error_element => Error}), + make_error_reply_from_element(Elem, Error); make_error_reply(Acc, Error) -> make_error_reply(Acc, mongoose_acc:element(Acc), Error). make_error_reply(Acc, Packet, Error) -> + ?LOG_DEBUG(#{what => make_error_reply, + acc => Acc, error_element => Error}), case mongoose_acc:get(flag, error, false, Acc) of true -> ?LOG_ERROR(#{what => error_reply_to_error, exml_packet => Packet, @@ -135,9 +137,14 @@ make_error_reply(Acc, Packet, Error) -> {Acc, {error, {already_an_error, Packet, Error}}}; _ -> {mongoose_acc:set(flag, error, true, Acc), - make_error_reply(Packet, Error)} + make_error_reply_from_element(Packet, Error)} end. +make_error_reply_from_element(#xmlel{name = Name, attrs = Attrs, + children = SubTags}, Error) -> + NewAttrs = make_error_reply_attrs(Attrs), + #xmlel{name = Name, attrs = NewAttrs, children = SubTags ++ [Error]}. + -spec make_error_reply_attrs([binary_pair()]) -> [binary_pair(), ...]. make_error_reply_attrs(Attrs) -> To = xml:get_attr(<<"to">>, Attrs), From 72b49daf876a2b8c7563f75eecd30c4626914da9 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:38:36 +0100 Subject: [PATCH 06/13] Add stacktrace into SAFELY macro for throws --- include/safely.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/safely.hrl b/include/safely.hrl index b9b07a9504a..f79cb2ae36d 100644 --- a/include/safely.hrl +++ b/include/safely.hrl @@ -4,7 +4,7 @@ -define(SAFELY(F), try F catch error:R:S -> {exception, #{class => error, reason => R, stacktrace => S}}; - throw:R -> {exception, #{class => throw, reason => R}}; + throw:R:S -> {exception, #{class => throw, reason => R, stacktrace => S}}; exit:R:S -> {exception, #{class => exit, reason => R, stacktrace => S}} end). From bab1edb341d0c4ead1924d53cb7f5fa6793e1f60 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:39:05 +0100 Subject: [PATCH 07/13] Log pid node in sm_route_to_pid debug log message --- src/ejabberd_sm.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index cd80c06b055..bdecab80698 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -623,7 +623,8 @@ do_route(Acc, From, To, El) -> do_route_offline(Name, mongoose_acc:stanza_type(Acc), From, To, Acc, El); Pid when is_pid(Pid) -> - ?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid, acc => Acc}), + ?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid, + session_node => node(Pid), acc => Acc}), mongoose_c2s:route(Pid, Acc), Acc end From dda610ccb67286d612d6fb0f38f0005f3e7c3a49 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:40:39 +0100 Subject: [PATCH 08/13] Rename mod_global_distrib:maybe_reroute to filter_packet Hook callbacks should match the hook names --- src/global_distrib/mod_global_distrib.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/global_distrib/mod_global_distrib.erl b/src/global_distrib/mod_global_distrib.erl index d20a2324a75..3f0db76b3f5 100644 --- a/src/global_distrib/mod_global_distrib.erl +++ b/src/global_distrib/mod_global_distrib.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%============================================================================== -%% Intercepts filter_packet hook with maybe_reroute callback. +%% Intercepts filter_packet hook. -module(mod_global_distrib). -author('konrad.zemek@erlang-solutions.com'). @@ -28,7 +28,7 @@ -export([deps/2, start/2, stop/1, config_spec/0, instrumentation/0]). -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]). --export([maybe_reroute/3]). +-export([filter_packet/3]). -export([process_opts/1, process_endpoint/1]). -ignore_xref([remove_metadata/2, instrumentation/0]). @@ -77,7 +77,10 @@ instrumentation() -> {?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{}, #{metrics => #{count => spiral}}}]. hooks() -> - [{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}]. + %% filter_packet is called in mongoose_router_global as a first + %% element of the routing chain. + %% mongoose_router_localdomain is called next. + [{filter_packet, global, fun ?MODULE:filter_packet/3, #{}, 99}]. -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> @@ -229,11 +232,11 @@ remove_metadata(Acc, Key) -> %% Hooks implementation %%-------------------------------------------------------------------- --spec maybe_reroute(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when +-spec filter_packet(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when FPacket :: mongoose_hooks:filter_packet_acc(), Params :: map(), Extra :: map(). -maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, +filter_packet({#jid{ luser = SameUser, lserver = SameServer } = _From, #jid{ luser = SameUser, lserver = SameServer } = _To, _Acc, _Packet} = FPacket, _, _) -> %% GD is not designed to support two user sessions existing in distinct clusters @@ -244,7 +247,7 @@ maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve %% on mim1 was waiting. {ok, FPacket}; -maybe_reroute({From, To, _, Packet} = FPacket, _, _) -> +filter_packet({From, To, _, Packet} = FPacket, _, _) -> Acc = maybe_initialize_metadata(FPacket), {ok, ID} = find_metadata(Acc, id), LocalHost = opt(local_host), From eb6c12d08083d17908915da2a2dc1dc21d825667 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 00:41:49 +0100 Subject: [PATCH 09/13] Add extra logging in mod_global_distrib_SUITE To debug harder cases --- big_tests/tests/mod_global_distrib_SUITE.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/big_tests/tests/mod_global_distrib_SUITE.erl b/big_tests/tests/mod_global_distrib_SUITE.erl index 217ad0cf12e..4674a6fc800 100644 --- a/big_tests/tests/mod_global_distrib_SUITE.erl +++ b/big_tests/tests/mod_global_distrib_SUITE.erl @@ -697,7 +697,6 @@ test_location_disconnect(Config) -> ok = rpc(asia_node, application, stop, [mongooseim]), %% TODO: Stopping mongooseim alone should probably stop connections too ok = rpc(asia_node, application, stop, [ranch]), - escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), Error = escalus:wait_for_stanza(Alice), escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) @@ -1433,10 +1432,14 @@ custom_loglevels() -> {mod_global_distrib_connection, debug}, %% to check if gc or refresh is triggered {mod_global_distrib_server_mgr, info}, - %% To debug incoming connections + %% To debug incoming connections % {mod_global_distrib_receiver, info}, - %% to debug global session set/delete - {mod_global_distrib_mapping, debug} + %% to debug global session set/delete + {mod_global_distrib_mapping, debug}, + %% To log make_error_reply calls + {jlib, debug}, + %% to log sm_route + {ejabberd_sm, debug} ]. test_hosts() -> [mim, mim2, reg]. From c76a14d842a456e35501c258abecdb27de5fc890 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 02:50:29 +0100 Subject: [PATCH 10/13] Test websockets connections in test_component_disconnect too --- big_tests/test.config | 7 ++++ big_tests/tests/mod_global_distrib_SUITE.erl | 37 +++++++++++++++++--- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/big_tests/test.config b/big_tests/test.config index dc52d09b207..f1b2eb11ed8 100644 --- a/big_tests/test.config +++ b/big_tests/test.config @@ -198,6 +198,13 @@ {server, <<"localhost">>}, {password, <<"password">>}, {port, 5252}]}, + {adam, [ %% used in mod_global_distrib_SUITE, websockets + {username, <<"adam">>}, + {server, <<"localhost">>}, + {password, <<"password">>}, + {transport, escalus_ws}, + {port, 5272}, + {wspath, <<"/ws-xmpp">>}]}, {neustradamus, [ {username, <<"neustradamus">>}, {server, <<"localhost">>}, diff --git a/big_tests/tests/mod_global_distrib_SUITE.erl b/big_tests/tests/mod_global_distrib_SUITE.erl index 4674a6fc800..4c9c6aefd50 100644 --- a/big_tests/tests/mod_global_distrib_SUITE.erl +++ b/big_tests/tests/mod_global_distrib_SUITE.erl @@ -688,18 +688,26 @@ test_component_disconnect(Config) -> test_location_disconnect(Config) -> try escalus:fresh_story( - Config, [{alice, 1}, {eve, 1}], - fun(Alice, Eve) -> + Config, [{alice, 1}, {eve, 1}, {adam, 1}], + fun(Alice, Eve, Adam) -> escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), - escalus_client:wait_for_stanza(Eve), + escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, from Europe1!">>)), + escalus_client:wait_for_stanza(Adam), + + print_sessions_debug_info(asia_node), ok = rpc(asia_node, application, stop, [mongooseim]), %% TODO: Stopping mongooseim alone should probably stop connections too ok = rpc(asia_node, application, stop, [ranch]), + escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), Error = escalus:wait_for_stanza(Alice), - escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) + escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error), + + escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, again!">>)), + Error2 = escalus:wait_for_stanza(Alice), + escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error2) end) after rpc(asia_node, application, start, [ranch]), @@ -1409,6 +1417,27 @@ can_connect_to_port(Port) -> false end. +%% Prints information about the active sessions +print_sessions_debug_info(NodeName) -> + Node = rpc(NodeName, erlang, node, []), + Nodes = rpc(NodeName, erlang, nodes, []), + ct:log("name=~p, erlang_node=~p, other_nodes=~p", [NodeName, Node, Nodes]), + + Children = rpc(NodeName, supervisor, which_children, [mongoose_c2s_sup]), + ct:log("C2S processes under a supervisour ~p", [Children]), + + Sessions = rpc(NodeName, ejabberd_sm, get_full_session_list, []), + ct:log("C2S processes in the session manager ~p", [Sessions]), + + Sids = [element(2, Session) || Session <- Sessions], + Pids = [Pid || {_, Pid} <- Sids], + PidNodes = [{Pid, node(Pid)} || Pid <- Pids], + ct:log("Pids on nodes ~p", [PidNodes]), + + Info = [{Pid, rpc:call(Node, erlang, process_info, [Pid])} || {Pid, Node} <- PidNodes], + ct:log("Processes info ~p", [Info]), + ok. + %% ----------------------------------------------------------------------- %% Custom log levels for GD modules during the tests From d44b5f7073bdf41a9d90984eab8695fbc1c20cb1 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 02:50:57 +0100 Subject: [PATCH 11/13] Log removed_sessions_count in sm_cleanup_initial log notice --- src/ejabberd_sm_redis.erl | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 39f7e7f52f9..fbe584a7076 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -26,14 +26,15 @@ -ignore_xref([maybe_initial_cleanup/2]). --spec init(map()) -> any(). +-spec init(map()) -> ok. init(_Opts) -> %% Clean current node's sessions from previous life - {Elapsed, RetVal} = timer:tc(?MODULE, maybe_initial_cleanup, [node(), true]), + {Elapsed, Count} = timer:tc(?MODULE, maybe_initial_cleanup, [node(), true]), ?LOG_NOTICE(#{what => sm_cleanup_initial, text => <<"SM cleanup on start took">>, - duration => erlang:round(Elapsed / 1000)}), - RetVal. + duration => erlang:round(Elapsed / 1000), + removed_sessions_count => Count}), + ok. -spec get_sessions() -> [ejabberd_sm:session()]. get_sessions() -> @@ -109,11 +110,12 @@ delete_session(SID, User, Server, Resource) -> ok end. --spec cleanup(atom()) -> any(). +-spec cleanup(atom()) -> ok. cleanup(Node) -> - maybe_initial_cleanup(Node, false). + maybe_initial_cleanup(Node, false), + ok. --spec maybe_initial_cleanup(atom(), boolean()) -> any(). +-spec maybe_initial_cleanup(atom(), boolean()) -> non_neg_integer(). maybe_initial_cleanup(Node, Initial) -> Hashes = mongoose_redis:cmd(["SMEMBERS", n(Node)]), mongoose_redis:cmd(["DEL", n(Node)]), @@ -131,7 +133,8 @@ maybe_initial_cleanup(Node, Initial) -> Session end end, Hashes), - ejabberd_sm:sessions_cleanup(Sessions). + ejabberd_sm:sessions_cleanup(Sessions), + length(Sessions). -spec total_count() -> integer(). total_count() -> From 0a6f37d6849e11da2d810dc71c84cdd17f81250a Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Fri, 29 Nov 2024 02:51:58 +0100 Subject: [PATCH 12/13] Use ranch draining for regular connections Use c2s info for websocket connections --- src/ejabberd_app.erl | 64 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 353880ef750..d6431b2a58b 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -34,6 +34,8 @@ -include("mongoose.hrl"). +-type typed_listeners() :: [{Type :: ranch | cowboy, Listener :: ranch:ref()}]. + %%% %%% Application API @@ -86,10 +88,12 @@ do_start() -> %% @doc Prepare the application for termination. %% This function is called when an application is about to be stopped, %% before shutting down the processes of the application. -prep_stop(State) -> +prep_stop(_State) -> mongoose_deprecations:stop(), - suspend_listeners(ranch_listeners()), - broadcast_c2s_shutdown_sup(), + TypedListeners = get_typed_listeners(), + suspend_listeners(TypedListeners), + StoppedCount = broadcast_c2s_shutdown_sup(), + StoppedCount2 = broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners), mongoose_listener:stop(), mongoose_modules:stop(), mongoose_service:stop(), @@ -97,12 +101,13 @@ prep_stop(State) -> mongoose_graphql_commands:stop(), mongoose_router:stop(), mongoose_system_probes:stop(), - State. + #{stopped_count => StoppedCount + StoppedCount2}. %% All the processes were killed when this function is called -stop(_State) -> +stop(#{stopped_count := StoppedCount}) -> mongoose_config:stop(), - ?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, node => node()}), + ?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, + node => node(), stopped_sessions_count => StoppedCount}), delete_pid_file(), update_status_file(stopped), %% We cannot stop other applications inside of the stop callback @@ -114,17 +119,32 @@ stop(_State) -> %%% Internal functions %%% -suspend_listeners(Listeners) -> - lists:foreach(fun ranch:suspend_listener/1, Listeners). +-spec suspend_listeners(typed_listeners()) -> ok. +suspend_listeners(TypedListeners) -> + [ranch:suspend_listener(Ref) || {_Type, Ref} <- TypedListeners], + ok. -ranch_listeners() -> +-spec get_typed_listeners() -> typed_listeners(). +get_typed_listeners() -> Children = supervisor:which_children(mongoose_listener_sup), - Listeners1 = [ejabberd_cowboy:ref(Listener) || {Listener, _, _, [ejabberd_cowboy]} <- Children], - Listeners2 = [Ref || {Ref, _, _, [mongoose_c2s_listener]} <- Children], + Listeners1 = [{cowboy, ejabberd_cowboy:ref(Listener)} + || {Listener, _, _, [ejabberd_cowboy]} <- Children], + Listeners2 = [{ranch, Ref} + || {Ref, _, _, [mongoose_c2s_listener]} <- Children], Listeners1 ++ Listeners2. --spec broadcast_c2s_shutdown_sup() -> ok. +-spec broadcast_c2s_shutdown_sup() -> StoppedCount :: non_neg_integer(). broadcast_c2s_shutdown_sup() -> + %% Websocket c2s connections have two processes per user: + %% - one is websocket Cowboy process. + %% - one is under mongoose_c2s_sup. + %% + %% Regular XMPP connections are not under mongoose_c2s_sup, + %% they are under the Ranch listener, which is a child of mongoose_listener_sup. + %% + %% We could use ejabberd_sm to get both Websocket and regular XMPP sessions, + %% but waiting till the list size is zero is much more computationally + %% expensive in that case. Children = supervisor:which_children(mongoose_c2s_sup), lists:foreach( fun({_, Pid, _, _}) -> @@ -136,7 +156,25 @@ broadcast_c2s_shutdown_sup() -> Res = supervisor:count_children(mongoose_c2s_sup), proplists:get_value(active, Res) end, - 0). + 0), + length(Children). + +%% Based on https://ninenines.eu/docs/en/ranch/2.1/guide/connection_draining/ +-spec broadcast_c2s_shutdown_to_regular_c2s_connections(typed_listeners()) -> + non_neg_integer(). +broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners) -> + Refs = [Ref || {ranch, Ref} <- TypedListeners], + StoppedCount = lists:foldl( + fun(Ref, Count) -> + Conns = ranch:procs(Ref, connections), + [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- Conns], + length(Conns) + Count + end, 0, Refs), + lists:foreach( + fun(Ref) -> + ok = ranch:wait_for_connections(Ref, '==', 0) + end, Refs), + StoppedCount. %%% %%% PID file From 87f7bf4b05a5630b6087b4c70438c34b3939b997 Mon Sep 17 00:00:00 2001 From: Mikhail Uvarov Date: Mon, 2 Dec 2024 10:29:37 +0100 Subject: [PATCH 13/13] Move listener stop into mongoose_listener:suspend_listeners_and_shutdown_connections --- big_tests/tests/shutdown_SUITE.erl | 5 ++- src/ejabberd_app.erl | 71 +----------------------------- src/mongoose_listener.erl | 66 +++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 70 deletions(-) diff --git a/big_tests/tests/shutdown_SUITE.erl b/big_tests/tests/shutdown_SUITE.erl index 14977b32ccf..dcfd5423ece 100644 --- a/big_tests/tests/shutdown_SUITE.erl +++ b/big_tests/tests/shutdown_SUITE.erl @@ -123,7 +123,10 @@ wait_for_down(Pid) -> MonRef = erlang:monitor(process, Pid), receive {'DOWN', MonRef, process, _, _} -> ok - after 5000 -> ct:fail(wait_for_down_timeout) + after 5000 -> + ct:pal("wait_for_down current_stacktrace ~p", + [rpc:pinfo(Pid, current_stacktrace)]), + ct:fail(wait_for_down_timeout) end. %% Waits until the TCP socket is closed diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index d6431b2a58b..877d82ebe1f 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -34,9 +34,6 @@ -include("mongoose.hrl"). --type typed_listeners() :: [{Type :: ranch | cowboy, Listener :: ranch:ref()}]. - - %%% %%% Application API %%% @@ -90,10 +87,7 @@ do_start() -> %% before shutting down the processes of the application. prep_stop(_State) -> mongoose_deprecations:stop(), - TypedListeners = get_typed_listeners(), - suspend_listeners(TypedListeners), - StoppedCount = broadcast_c2s_shutdown_sup(), - StoppedCount2 = broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners), + StoppedCount = mongoose_listener:suspend_listeners_and_shutdown_connections(), mongoose_listener:stop(), mongoose_modules:stop(), mongoose_service:stop(), @@ -101,7 +95,7 @@ prep_stop(_State) -> mongoose_graphql_commands:stop(), mongoose_router:stop(), mongoose_system_probes:stop(), - #{stopped_count => StoppedCount + StoppedCount2}. + #{stopped_count => StoppedCount}. %% All the processes were killed when this function is called stop(#{stopped_count := StoppedCount}) -> @@ -115,67 +109,6 @@ stop(#{stopped_count := StoppedCount}) -> %% That is why we call mnesia:stop() inside of db_init_mnesia() instead. ok. -%%% -%%% Internal functions -%%% - --spec suspend_listeners(typed_listeners()) -> ok. -suspend_listeners(TypedListeners) -> - [ranch:suspend_listener(Ref) || {_Type, Ref} <- TypedListeners], - ok. - --spec get_typed_listeners() -> typed_listeners(). -get_typed_listeners() -> - Children = supervisor:which_children(mongoose_listener_sup), - Listeners1 = [{cowboy, ejabberd_cowboy:ref(Listener)} - || {Listener, _, _, [ejabberd_cowboy]} <- Children], - Listeners2 = [{ranch, Ref} - || {Ref, _, _, [mongoose_c2s_listener]} <- Children], - Listeners1 ++ Listeners2. - --spec broadcast_c2s_shutdown_sup() -> StoppedCount :: non_neg_integer(). -broadcast_c2s_shutdown_sup() -> - %% Websocket c2s connections have two processes per user: - %% - one is websocket Cowboy process. - %% - one is under mongoose_c2s_sup. - %% - %% Regular XMPP connections are not under mongoose_c2s_sup, - %% they are under the Ranch listener, which is a child of mongoose_listener_sup. - %% - %% We could use ejabberd_sm to get both Websocket and regular XMPP sessions, - %% but waiting till the list size is zero is much more computationally - %% expensive in that case. - Children = supervisor:which_children(mongoose_c2s_sup), - lists:foreach( - fun({_, Pid, _, _}) -> - mongoose_c2s:exit(Pid, system_shutdown) - end, - Children), - mongoose_lib:wait_until( - fun() -> - Res = supervisor:count_children(mongoose_c2s_sup), - proplists:get_value(active, Res) - end, - 0), - length(Children). - -%% Based on https://ninenines.eu/docs/en/ranch/2.1/guide/connection_draining/ --spec broadcast_c2s_shutdown_to_regular_c2s_connections(typed_listeners()) -> - non_neg_integer(). -broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners) -> - Refs = [Ref || {ranch, Ref} <- TypedListeners], - StoppedCount = lists:foldl( - fun(Ref, Count) -> - Conns = ranch:procs(Ref, connections), - [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- Conns], - length(Conns) + Count - end, 0, Refs), - lists:foreach( - fun(Ref) -> - ok = ranch:wait_for_connections(Ref, '==', 0) - end, Refs), - StoppedCount. - %%% %%% PID file %%% diff --git a/src/mongoose_listener.erl b/src/mongoose_listener.erl index df891771657..49fa682f225 100644 --- a/src/mongoose_listener.erl +++ b/src/mongoose_listener.erl @@ -10,6 +10,7 @@ %% API -export([start/0, stop/0]). +-export([suspend_listeners_and_shutdown_connections/0]). -callback start_listener(options()) -> ok. -callback instrumentation(options()) -> [mongoose_instrument:spec()]. @@ -23,6 +24,7 @@ any() => any()}. -type id() :: {inet:port_number(), inet:ip_address(), proto()}. -type proto() :: tcp. +-type typed_listeners() :: [{Type :: ranch | cowboy, Listener :: ranch:ref()}]. -export_type([options/0, id/0, proto/0]). @@ -73,3 +75,67 @@ listener_instrumentation(Opts = #{module := Module}) -> false -> [] end. + +-spec suspend_listeners_and_shutdown_connections() -> StoppedCount :: non_neg_integer(). +suspend_listeners_and_shutdown_connections() -> + TypedListeners = get_typed_listeners(), + suspend_listeners(TypedListeners), + broadcast_c2s_shutdown_sup() + + broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners). + +-spec suspend_listeners(typed_listeners()) -> ok. +suspend_listeners(TypedListeners) -> + [ranch:suspend_listener(Ref) || {_Type, Ref} <- TypedListeners], + ok. + +-spec get_typed_listeners() -> typed_listeners(). +get_typed_listeners() -> + Children = supervisor:which_children(mongoose_listener_sup), + Listeners1 = [{cowboy, ejabberd_cowboy:ref(Listener)} + || {Listener, _, _, [ejabberd_cowboy]} <- Children], + Listeners2 = [{ranch, Ref} + || {Ref, _, _, [mongoose_c2s_listener]} <- Children], + Listeners1 ++ Listeners2. + +-spec broadcast_c2s_shutdown_sup() -> StoppedCount :: non_neg_integer(). +broadcast_c2s_shutdown_sup() -> + %% Websocket c2s connections have two processes per user: + %% - one is websocket Cowboy process. + %% - one is under mongoose_c2s_sup. + %% + %% Regular XMPP connections are not under mongoose_c2s_sup, + %% they are under the Ranch listener, which is a child of mongoose_listener_sup. + %% + %% We could use ejabberd_sm to get both Websocket and regular XMPP sessions, + %% but waiting till the list size is zero is much more computationally + %% expensive in that case. + Children = supervisor:which_children(mongoose_c2s_sup), + lists:foreach( + fun({_, Pid, _, _}) -> + mongoose_c2s:exit(Pid, system_shutdown) + end, + Children), + mongoose_lib:wait_until( + fun() -> + Res = supervisor:count_children(mongoose_c2s_sup), + proplists:get_value(active, Res) + end, + 0), + length(Children). + +%% Based on https://ninenines.eu/docs/en/ranch/2.1/guide/connection_draining/ +-spec broadcast_c2s_shutdown_to_regular_c2s_connections(typed_listeners()) -> + non_neg_integer(). +broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners) -> + Refs = [Ref || {ranch, Ref} <- TypedListeners], + StoppedCount = lists:foldl( + fun(Ref, Count) -> + Conns = ranch:procs(Ref, connections), + [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- Conns], + length(Conns) + Count + end, 0, Refs), + lists:foreach( + fun(Ref) -> + ok = ranch:wait_for_connections(Ref, '==', 0) + end, Refs), + StoppedCount.