diff --git a/big_tests/default.spec b/big_tests/default.spec index ed2070c1aad..b88352f08d1 100644 --- a/big_tests/default.spec +++ b/big_tests/default.spec @@ -121,6 +121,7 @@ {suites, "tests", cets_disco_SUITE}. {suites, "tests", start_node_id_SUITE}. {suites, "tests", tr_util_SUITE}. +{suites, "tests", shutdown_SUITE}. {config, ["test.config"]}. {logdir, "ct_report"}. diff --git a/big_tests/dynamic_domains.spec b/big_tests/dynamic_domains.spec index c451bbc6da9..3662dcd9760 100644 --- a/big_tests/dynamic_domains.spec +++ b/big_tests/dynamic_domains.spec @@ -163,6 +163,7 @@ {suites, "tests", cets_disco_SUITE}. {suites, "tests", start_node_id_SUITE}. {suites, "tests", tr_util_SUITE}. +{suites, "tests", shutdown_SUITE}. {config, ["dynamic_domains.config", "test.config"]}. diff --git a/big_tests/test.config b/big_tests/test.config index 84137c946da..c8dc2fa91aa 100644 --- a/big_tests/test.config +++ b/big_tests/test.config @@ -198,6 +198,13 @@ {server, <<"localhost">>}, {password, <<"password">>}, {port, 5252}]}, + {adam, [ %% used in mod_global_distrib_SUITE, websockets + {username, <<"adam">>}, + {server, <<"localhost">>}, + {password, <<"password">>}, + {transport, escalus_ws}, + {port, 5272}, + {wspath, <<"/ws-xmpp">>}]}, {neustradamus, [ {username, <<"neustradamus">>}, {server, <<"localhost">>}, @@ -348,6 +355,7 @@ scope = \"global\" workers = 10 strategy = \"random_worker\" + connection.database = {{redis_database_number}} [outgoing_pools.rdbms.default] scope = \"global\" workers = 5 diff --git a/big_tests/tests/mod_global_distrib_SUITE.erl b/big_tests/tests/mod_global_distrib_SUITE.erl index 217ad0cf12e..4c9c6aefd50 100644 --- a/big_tests/tests/mod_global_distrib_SUITE.erl +++ b/big_tests/tests/mod_global_distrib_SUITE.erl @@ -688,19 +688,26 @@ test_component_disconnect(Config) -> test_location_disconnect(Config) -> try escalus:fresh_story( - Config, [{alice, 1}, {eve, 1}], - fun(Alice, Eve) -> + Config, [{alice, 1}, {eve, 1}, {adam, 1}], + fun(Alice, Eve, Adam) -> escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), - escalus_client:wait_for_stanza(Eve), + escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, from Europe1!">>)), + escalus_client:wait_for_stanza(Adam), + + print_sessions_debug_info(asia_node), ok = rpc(asia_node, application, stop, [mongooseim]), %% TODO: Stopping mongooseim alone should probably stop connections too ok = rpc(asia_node, application, stop, [ranch]), escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), Error = escalus:wait_for_stanza(Alice), - escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) + escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error), + + escalus_client:send(Alice, escalus_stanza:chat_to(Adam, <<"Hi, Adam, again!">>)), + Error2 = escalus:wait_for_stanza(Alice), + escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error2) end) after rpc(asia_node, application, start, [ranch]), @@ -1410,6 +1417,27 @@ can_connect_to_port(Port) -> false end. +%% Prints information about the active sessions +print_sessions_debug_info(NodeName) -> + Node = rpc(NodeName, erlang, node, []), + Nodes = rpc(NodeName, erlang, nodes, []), + ct:log("name=~p, erlang_node=~p, other_nodes=~p", [NodeName, Node, Nodes]), + + Children = rpc(NodeName, supervisor, which_children, [mongoose_c2s_sup]), + ct:log("C2S processes under a supervisour ~p", [Children]), + + Sessions = rpc(NodeName, ejabberd_sm, get_full_session_list, []), + ct:log("C2S processes in the session manager ~p", [Sessions]), + + Sids = [element(2, Session) || Session <- Sessions], + Pids = [Pid || {_, Pid} <- Sids], + PidNodes = [{Pid, node(Pid)} || Pid <- Pids], + ct:log("Pids on nodes ~p", [PidNodes]), + + Info = [{Pid, rpc:call(Node, erlang, process_info, [Pid])} || {Pid, Node} <- PidNodes], + ct:log("Processes info ~p", [Info]), + ok. + %% ----------------------------------------------------------------------- %% Custom log levels for GD modules during the tests @@ -1433,10 +1461,14 @@ custom_loglevels() -> {mod_global_distrib_connection, debug}, %% to check if gc or refresh is triggered {mod_global_distrib_server_mgr, info}, - %% To debug incoming connections + %% To debug incoming connections % {mod_global_distrib_receiver, info}, - %% to debug global session set/delete - {mod_global_distrib_mapping, debug} + %% to debug global session set/delete + {mod_global_distrib_mapping, debug}, + %% To log make_error_reply calls + {jlib, debug}, + %% to log sm_route + {ejabberd_sm, debug} ]. test_hosts() -> [mim, mim2, reg]. diff --git a/big_tests/tests/shutdown_SUITE.erl b/big_tests/tests/shutdown_SUITE.erl new file mode 100644 index 00000000000..dcfd5423ece --- /dev/null +++ b/big_tests/tests/shutdown_SUITE.erl @@ -0,0 +1,139 @@ +-module(shutdown_SUITE). + +-compile([export_all, nowarn_export_all]). +-import(distributed_helper, [mim/0, rpc/4]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("exml/include/exml.hrl"). + +all() -> + [{group, main}]. + +groups() -> + [{main, [], cases()}]. + +cases() -> + [shutdown, + client_tries_to_connect_before_listener_stop]. + +init_per_suite(Config) -> + mongoose_helper:inject_module(?MODULE), + escalus:init_per_suite(Config). + +end_per_suite(Config) -> + escalus:end_per_suite(Config). + +init_per_group(_, Config) -> + escalus:create_users(Config, escalus:get_users([geralt_s, alice])), + Config. + +end_per_group(_, Config) -> + escalus:delete_users(Config, escalus:get_users([geralt_s, alice])), + Config. + +init_per_testcase(shutdown = TC, Config) -> + logger_ct_backend:start(), + escalus:init_per_testcase(TC, Config); +init_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) -> + escalus:init_per_testcase(TC, Config); +init_per_testcase(Name, Config) -> + escalus:init_per_testcase(Name, Config). + +end_per_testcase(shutdown = TC, Config) -> + logger_ct_backend:stop(), + escalus:end_per_testcase(TC, Config); +end_per_testcase(client_tries_to_connect_before_listener_stop = TC, Config) -> + rpc(mim(), meck, unload, []), + escalus:end_per_testcase(TC, Config); +end_per_testcase(Name, Config) -> + escalus:end_per_testcase(Name, Config). + +shutdown(Config) -> + UserSpec = escalus_users:get_userspec(Config, geralt_s), + {ok, Alice, _} = escalus_connection:start(UserSpec), + logger_ct_backend:capture(error), + ejabberd_node_utils:restart_application(mongooseim), + logger_ct_backend:stop_capture(), + FilterFun = fun(_, Msg) -> re:run(Msg, "event_not_registered") /= nomatch end, + [] = logger_ct_backend:recv(FilterFun), + %% Ensure that Alice gets a shutdown stanza + escalus:assert(is_stream_error, [<<"system-shutdown">>, <<>>], + escalus_client:wait_for_stanza(Alice)), + escalus:assert(is_stream_end, escalus_client:wait_for_stanza(Alice)), + true = escalus_connection:wait_for_close(Alice, timer:seconds(1)). + +client_tries_to_connect_before_listener_stop(Config) -> + %% Ensures that user would not be able to connect while we are stopping + %% the listeners and other c2s processes + UserSpec = escalus_users:get_userspec(Config, geralt_s), + PortNum = proplists:get_value(port, UserSpec), + %% Ask MongooseIM to pause the stopping process + %% so we can check that listeners were suspended correctly + block_listener(), + %% Check that the listener is working + {ok, ConnPort} = gen_tcp:connect("127.0.0.1", PortNum, []), + %% Trigger the restarting logic in a separate parallel process + RestPid = restart_application_non_blocking(), + %% Wait until we are blocked in mongoose_listener:stop/0 + Called = wait_for_called(), + {error, econnrefused} = gen_tcp:connect("127.0.0.1", PortNum, []), + %% Resume to stop the listeners + resume(Called), + %% Check that the old TCP connections are closed + receive_tcp_closed(ConnPort), + %% Wait till mongooseim is fully restarted + wait_for_down(RestPid). + +block_listener() -> + rpc(mim(), meck, new, [mongoose_listener, [no_link, passthrough]]), + Pid = self(), + F = fun() -> + wait_for_resume(Pid), + meck:passthrough([]) + end, + rpc(mim(), meck, expect, [mongoose_listener, stop, F]). + +restart_application_non_blocking() -> + spawn_link(fun() -> ejabberd_node_utils:restart_application(mongooseim) end). + +wait_for_called() -> + receive + {called, Called} -> + Called + after 5000 -> + error(wait_for_called_timeout) + end. + +%% Blocks the mocked process until resume/1 is called +wait_for_resume(Pid) -> + MonRef = erlang:monitor(process, Pid), + Called = {self(), MonRef}, + Pid ! {called, Called}, + receive + {'DOWN', MonRef, process, _, _} -> ok; + {resume, MonRef} -> ok + end. + +%% Command to the mocked process to resume the operation +resume({Pid, MonRef}) -> + Pid ! {resume, MonRef}. + +wait_for_down(Pid) -> + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, _, _} -> ok + after 5000 -> + ct:pal("wait_for_down current_stacktrace ~p", + [rpc:pinfo(Pid, current_stacktrace)]), + ct:fail(wait_for_down_timeout) + end. + +%% Waits until the TCP socket is closed +receive_tcp_closed(ConnPort) -> + receive + {tcp_closed, ConnPort} -> + ok + after 5000 -> + ct:fail(wait_for_tcp_close_timeout) + end. diff --git a/include/safely.hrl b/include/safely.hrl index b9b07a9504a..f79cb2ae36d 100644 --- a/include/safely.hrl +++ b/include/safely.hrl @@ -4,7 +4,7 @@ -define(SAFELY(F), try F catch error:R:S -> {exception, #{class => error, reason => R, stacktrace => S}}; - throw:R -> {exception, #{class => throw, reason => R}}; + throw:R:S -> {exception, #{class => throw, reason => R, stacktrace => S}}; exit:R:S -> {exception, #{class => exit, reason => R, stacktrace => S}} end). diff --git a/rel/fed1.vars-toml.config b/rel/fed1.vars-toml.config index 3f15efe96bc..c90682fc67c 100644 --- a/rel/fed1.vars-toml.config +++ b/rel/fed1.vars-toml.config @@ -19,6 +19,7 @@ {hosts, "\"fed1\", \"fed2\""}. {default_server_domain, "\"fed1\""}. {cluster_name, "fed"}. +{redis_database_number, "2"}. %% domain.example.com is for multitenancy preset, muc_SUITE:register_over_s2s {s2s_addr, "[[s2s.address]] diff --git a/rel/mim1.vars-toml.config b/rel/mim1.vars-toml.config index 0aa1b9467e1..207a2010fd5 100644 --- a/rel/mim1.vars-toml.config +++ b/rel/mim1.vars-toml.config @@ -22,6 +22,7 @@ {host_types, "\"test type\", \"dummy auth\", \"anonymous\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {mod_amp, ""}. {host_config, diff --git a/rel/mim2.vars-toml.config b/rel/mim2.vars-toml.config index e72db987632..d0b0d45f93e 100644 --- a/rel/mim2.vars-toml.config +++ b/rel/mim2.vars-toml.config @@ -20,6 +20,7 @@ {host_types, "\"test type\", \"dummy auth\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {s2s_addr, "[[s2s.address]] host = \"localhost2\" ip_address = \"127.0.0.1\""}. diff --git a/rel/mim3.vars-toml.config b/rel/mim3.vars-toml.config index a062094a919..9f14b599390 100644 --- a/rel/mim3.vars-toml.config +++ b/rel/mim3.vars-toml.config @@ -23,6 +23,7 @@ {hosts, "\"localhost\", \"anonymous.localhost\", \"localhost.bis\""}. {default_server_domain, "\"localhost\""}. {cluster_name, "mim"}. +{redis_database_number, "0"}. {s2s_addr, "[[s2s.address]] host = \"localhost2\" diff --git a/rel/reg1.vars-toml.config b/rel/reg1.vars-toml.config index 9462854da8c..acca201257f 100644 --- a/rel/reg1.vars-toml.config +++ b/rel/reg1.vars-toml.config @@ -23,6 +23,8 @@ {hosts, "\"reg1\", \"localhost\""}. {default_server_domain, "\"reg1\""}. {cluster_name, "reg"}. +{cluster_name, "reg"}. +{redis_database_number, "1"}. {s2s_addr, "[[s2s.address]] host = \"localhost\" diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 0939a6d07de..877d82ebe1f 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -34,7 +34,6 @@ -include("mongoose.hrl"). - %%% %%% Application API %%% @@ -86,23 +85,23 @@ do_start() -> %% @doc Prepare the application for termination. %% This function is called when an application is about to be stopped, %% before shutting down the processes of the application. -prep_stop(State) -> +prep_stop(_State) -> mongoose_deprecations:stop(), - broadcast_c2s_shutdown_listeners(), + StoppedCount = mongoose_listener:suspend_listeners_and_shutdown_connections(), mongoose_listener:stop(), mongoose_modules:stop(), mongoose_service:stop(), - broadcast_c2s_shutdown_sup(), mongoose_wpool:stop(), mongoose_graphql_commands:stop(), mongoose_router:stop(), mongoose_system_probes:stop(), - State. + #{stopped_count => StoppedCount}. %% All the processes were killed when this function is called -stop(_State) -> +stop(#{stopped_count := StoppedCount}) -> mongoose_config:stop(), - ?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, node => node()}), + ?LOG_NOTICE(#{what => mongooseim_node_stopped, version => ?MONGOOSE_VERSION, + node => node(), stopped_sessions_count => StoppedCount}), delete_pid_file(), update_status_file(stopped), %% We cannot stop other applications inside of the stop callback @@ -110,41 +109,6 @@ stop(_State) -> %% That is why we call mnesia:stop() inside of db_init_mnesia() instead. ok. -%%% -%%% Internal functions -%%% - --spec broadcast_c2s_shutdown_listeners() -> ok. -broadcast_c2s_shutdown_listeners() -> - Children = supervisor:which_children(mongoose_listener_sup), - Listeners = [Ref || {Ref, _, _, [mongoose_c2s_listener]} <- Children], - lists:foreach( - fun(Listener) -> - ranch:suspend_listener(Listener), - [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- ranch:procs(Listener, connections)], - mongoose_lib:wait_until( - fun() -> - length(ranch:procs(Listener, connections)) - end, - 0) - end, - Listeners). - --spec broadcast_c2s_shutdown_sup() -> ok. -broadcast_c2s_shutdown_sup() -> - Children = supervisor:which_children(mongoose_c2s_sup), - lists:foreach( - fun({_, Pid, _, _}) -> - mongoose_c2s:exit(Pid, system_shutdown) - end, - Children), - mongoose_lib:wait_until( - fun() -> - Res = supervisor:count_children(mongoose_c2s_sup), - proplists:get_value(active, Res) - end, - 0). - %%% %%% PID file %%% diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index cd80c06b055..bdecab80698 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -623,7 +623,8 @@ do_route(Acc, From, To, El) -> do_route_offline(Name, mongoose_acc:stanza_type(Acc), From, To, Acc, El); Pid when is_pid(Pid) -> - ?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid, acc => Acc}), + ?LOG_DEBUG(#{what => sm_route_to_pid, session_pid => Pid, + session_node => node(Pid), acc => Acc}), mongoose_c2s:route(Pid, Acc), Acc end diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 39f7e7f52f9..fbe584a7076 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -26,14 +26,15 @@ -ignore_xref([maybe_initial_cleanup/2]). --spec init(map()) -> any(). +-spec init(map()) -> ok. init(_Opts) -> %% Clean current node's sessions from previous life - {Elapsed, RetVal} = timer:tc(?MODULE, maybe_initial_cleanup, [node(), true]), + {Elapsed, Count} = timer:tc(?MODULE, maybe_initial_cleanup, [node(), true]), ?LOG_NOTICE(#{what => sm_cleanup_initial, text => <<"SM cleanup on start took">>, - duration => erlang:round(Elapsed / 1000)}), - RetVal. + duration => erlang:round(Elapsed / 1000), + removed_sessions_count => Count}), + ok. -spec get_sessions() -> [ejabberd_sm:session()]. get_sessions() -> @@ -109,11 +110,12 @@ delete_session(SID, User, Server, Resource) -> ok end. --spec cleanup(atom()) -> any(). +-spec cleanup(atom()) -> ok. cleanup(Node) -> - maybe_initial_cleanup(Node, false). + maybe_initial_cleanup(Node, false), + ok. --spec maybe_initial_cleanup(atom(), boolean()) -> any(). +-spec maybe_initial_cleanup(atom(), boolean()) -> non_neg_integer(). maybe_initial_cleanup(Node, Initial) -> Hashes = mongoose_redis:cmd(["SMEMBERS", n(Node)]), mongoose_redis:cmd(["DEL", n(Node)]), @@ -131,7 +133,8 @@ maybe_initial_cleanup(Node, Initial) -> Session end end, Hashes), - ejabberd_sm:sessions_cleanup(Sessions). + ejabberd_sm:sessions_cleanup(Sessions), + length(Sessions). -spec total_count() -> integer(). total_count() -> diff --git a/src/global_distrib/mod_global_distrib.erl b/src/global_distrib/mod_global_distrib.erl index d20a2324a75..3f0db76b3f5 100644 --- a/src/global_distrib/mod_global_distrib.erl +++ b/src/global_distrib/mod_global_distrib.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%============================================================================== -%% Intercepts filter_packet hook with maybe_reroute callback. +%% Intercepts filter_packet hook. -module(mod_global_distrib). -author('konrad.zemek@erlang-solutions.com'). @@ -28,7 +28,7 @@ -export([deps/2, start/2, stop/1, config_spec/0, instrumentation/0]). -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]). --export([maybe_reroute/3]). +-export([filter_packet/3]). -export([process_opts/1, process_endpoint/1]). -ignore_xref([remove_metadata/2, instrumentation/0]). @@ -77,7 +77,10 @@ instrumentation() -> {?GLOBAL_DISTRIB_STOP_TTL_ZERO, #{}, #{metrics => #{count => spiral}}}]. hooks() -> - [{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}]. + %% filter_packet is called in mongoose_router_global as a first + %% element of the routing chain. + %% mongoose_router_localdomain is called next. + [{filter_packet, global, fun ?MODULE:filter_packet/3, #{}, 99}]. -spec config_spec() -> mongoose_config_spec:config_section(). config_spec() -> @@ -229,11 +232,11 @@ remove_metadata(Acc, Key) -> %% Hooks implementation %%-------------------------------------------------------------------- --spec maybe_reroute(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when +-spec filter_packet(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when FPacket :: mongoose_hooks:filter_packet_acc(), Params :: map(), Extra :: map(). -maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, +filter_packet({#jid{ luser = SameUser, lserver = SameServer } = _From, #jid{ luser = SameUser, lserver = SameServer } = _To, _Acc, _Packet} = FPacket, _, _) -> %% GD is not designed to support two user sessions existing in distinct clusters @@ -244,7 +247,7 @@ maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve %% on mim1 was waiting. {ok, FPacket}; -maybe_reroute({From, To, _, Packet} = FPacket, _, _) -> +filter_packet({From, To, _, Packet} = FPacket, _, _) -> Acc = maybe_initialize_metadata(FPacket), {ok, ID} = find_metadata(Acc, id), LocalHost = opt(local_host), diff --git a/src/jlib.erl b/src/jlib.erl index 7813c64d0fa..fa56fab7f00 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -120,14 +120,16 @@ make_result_iq_reply_attrs(Attrs) -> -spec make_error_reply(exml:element() | mongoose_acc:t(), xmlcdata() | exml:element()) -> exml:element() | {mongoose_acc:t(), exml:element() | {error, {already_an_error, _, _}}}. -make_error_reply(#xmlel{name = Name, attrs = Attrs, - children = SubTags}, Error) -> - NewAttrs = make_error_reply_attrs(Attrs), - #xmlel{name = Name, attrs = NewAttrs, children = SubTags ++ [Error]}; +make_error_reply(#xmlel{} = Elem, Error) -> + ?LOG_DEBUG(#{what => make_error_reply, + exml_packet => Elem, error_element => Error}), + make_error_reply_from_element(Elem, Error); make_error_reply(Acc, Error) -> make_error_reply(Acc, mongoose_acc:element(Acc), Error). make_error_reply(Acc, Packet, Error) -> + ?LOG_DEBUG(#{what => make_error_reply, + acc => Acc, error_element => Error}), case mongoose_acc:get(flag, error, false, Acc) of true -> ?LOG_ERROR(#{what => error_reply_to_error, exml_packet => Packet, @@ -135,9 +137,14 @@ make_error_reply(Acc, Packet, Error) -> {Acc, {error, {already_an_error, Packet, Error}}}; _ -> {mongoose_acc:set(flag, error, true, Acc), - make_error_reply(Packet, Error)} + make_error_reply_from_element(Packet, Error)} end. +make_error_reply_from_element(#xmlel{name = Name, attrs = Attrs, + children = SubTags}, Error) -> + NewAttrs = make_error_reply_attrs(Attrs), + #xmlel{name = Name, attrs = NewAttrs, children = SubTags ++ [Error]}. + -spec make_error_reply_attrs([binary_pair()]) -> [binary_pair(), ...]. make_error_reply_attrs(Attrs) -> To = xml:get_attr(<<"to">>, Attrs), diff --git a/src/mongoose_listener.erl b/src/mongoose_listener.erl index df891771657..49fa682f225 100644 --- a/src/mongoose_listener.erl +++ b/src/mongoose_listener.erl @@ -10,6 +10,7 @@ %% API -export([start/0, stop/0]). +-export([suspend_listeners_and_shutdown_connections/0]). -callback start_listener(options()) -> ok. -callback instrumentation(options()) -> [mongoose_instrument:spec()]. @@ -23,6 +24,7 @@ any() => any()}. -type id() :: {inet:port_number(), inet:ip_address(), proto()}. -type proto() :: tcp. +-type typed_listeners() :: [{Type :: ranch | cowboy, Listener :: ranch:ref()}]. -export_type([options/0, id/0, proto/0]). @@ -73,3 +75,67 @@ listener_instrumentation(Opts = #{module := Module}) -> false -> [] end. + +-spec suspend_listeners_and_shutdown_connections() -> StoppedCount :: non_neg_integer(). +suspend_listeners_and_shutdown_connections() -> + TypedListeners = get_typed_listeners(), + suspend_listeners(TypedListeners), + broadcast_c2s_shutdown_sup() + + broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners). + +-spec suspend_listeners(typed_listeners()) -> ok. +suspend_listeners(TypedListeners) -> + [ranch:suspend_listener(Ref) || {_Type, Ref} <- TypedListeners], + ok. + +-spec get_typed_listeners() -> typed_listeners(). +get_typed_listeners() -> + Children = supervisor:which_children(mongoose_listener_sup), + Listeners1 = [{cowboy, ejabberd_cowboy:ref(Listener)} + || {Listener, _, _, [ejabberd_cowboy]} <- Children], + Listeners2 = [{ranch, Ref} + || {Ref, _, _, [mongoose_c2s_listener]} <- Children], + Listeners1 ++ Listeners2. + +-spec broadcast_c2s_shutdown_sup() -> StoppedCount :: non_neg_integer(). +broadcast_c2s_shutdown_sup() -> + %% Websocket c2s connections have two processes per user: + %% - one is websocket Cowboy process. + %% - one is under mongoose_c2s_sup. + %% + %% Regular XMPP connections are not under mongoose_c2s_sup, + %% they are under the Ranch listener, which is a child of mongoose_listener_sup. + %% + %% We could use ejabberd_sm to get both Websocket and regular XMPP sessions, + %% but waiting till the list size is zero is much more computationally + %% expensive in that case. + Children = supervisor:which_children(mongoose_c2s_sup), + lists:foreach( + fun({_, Pid, _, _}) -> + mongoose_c2s:exit(Pid, system_shutdown) + end, + Children), + mongoose_lib:wait_until( + fun() -> + Res = supervisor:count_children(mongoose_c2s_sup), + proplists:get_value(active, Res) + end, + 0), + length(Children). + +%% Based on https://ninenines.eu/docs/en/ranch/2.1/guide/connection_draining/ +-spec broadcast_c2s_shutdown_to_regular_c2s_connections(typed_listeners()) -> + non_neg_integer(). +broadcast_c2s_shutdown_to_regular_c2s_connections(TypedListeners) -> + Refs = [Ref || {ranch, Ref} <- TypedListeners], + StoppedCount = lists:foldl( + fun(Ref, Count) -> + Conns = ranch:procs(Ref, connections), + [mongoose_c2s:exit(Pid, system_shutdown) || Pid <- Conns], + length(Conns) + Count + end, 0, Refs), + lists:foreach( + fun(Ref) -> + ok = ranch:wait_for_connections(Ref, '==', 0) + end, Refs), + StoppedCount.