From f80f0906d6dca513f6df6bb3904c9af28b571e66 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Mon, 2 Dec 2024 18:28:01 +0100 Subject: [PATCH 1/7] Add delays to SM buffer It is rerouted when no session was resumed and the resume_timeout has passed. This resulted in suprising messages, sometimes after a long time. --- src/stream_management/mod_stream_management.erl | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index d7ac5c5a1bd..39dbae4b1fa 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -386,7 +386,9 @@ maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, no handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) -> Sid = mongoose_c2s:get_sid(StateData), do_remove_smid(HostType, Sid, H), - reroute_buffer(StateData, SmState), + FromServer = mongoose_c2s:get_lserver(StateData), + NewState = add_delay_elements_to_buffer(SmState, FromServer), + reroute_buffer(StateData, NewState), SmState#sm_state{buffer = [], buffer_size = 0}. reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) -> @@ -394,6 +396,18 @@ reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _ reroute_buffer(StateData, #sm_state{buffer = Buffer}) -> mongoose_c2s:reroute_buffer(StateData, Buffer). +add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) -> + BufferWithDelays = [begin + TS = mongoose_acc:timestamp(Acc), + StanzaName = mongoose_acc:stanza_name(Acc), + StanzaType = mongoose_acc:stanza_type(Acc), + {From, To, El} = mongoose_acc:packet(Acc), + ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer), + AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay}, + mongoose_acc:update_stanza(AccParams, Acc) + end || Acc <- Buffer], + SmState#sm_state{buffer = BufferWithDelays}. + -spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term(). terminate(Reason, C2SState, StateData) -> ?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason, From 53e212045b866435697c75deafbbf492ec3c6412 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Thu, 5 Dec 2024 09:59:02 +0100 Subject: [PATCH 2/7] Add delayed tag check --- big_tests/tests/sm_SUITE.erl | 2 +- big_tests/tests/sm_helper.erl | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index e04376d113d..a4d3257e325 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -551,7 +551,7 @@ resend_unacked_on_reconnection(Config) -> %% User receives the messages from the offline store. NewUser = connect_spec(UserSpec, session, manual), send_initial_presence(NewUser), - sm_helper:wait_for_messages(NewUser, Texts), + sm_helper:wait_for_delayed_messages(NewUser, Texts), %% User acks the delayed messages so they won't go again %% to the offline store. escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)). diff --git a/big_tests/tests/sm_helper.erl b/big_tests/tests/sm_helper.erl index 29edd9bd350..554a8013561 100644 --- a/big_tests/tests/sm_helper.erl +++ b/big_tests/tests/sm_helper.erl @@ -35,6 +35,7 @@ -export([send_initial_presence/1, send_messages/3, wait_for_messages/2, + wait_for_delayed_messages/2, assert_messages/2, send_and_receive/3, get_ack/1, @@ -293,6 +294,12 @@ wait_for_messages(Alice, Texts) -> Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)), assert_messages(Stanzas, Texts). +wait_for_delayed_messages(Alice, Texts) -> + Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)), + assert_messages(Stanzas, Texts), + [escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(S, <<"delay">>)) + || S <- Stanzas]. + assert_messages(Stanzas, Texts) -> Bodies = lists:map(fun get_body/1, Stanzas), case Bodies of From d52cf09e0e44dd923aa71efb0836ff7b1a1891a7 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Thu, 5 Dec 2024 18:05:36 +0100 Subject: [PATCH 3/7] Add description to mod_presence delay element This is allowed by the delayed delivery XEP. It should make it more clear where the delay is coming from for the user and in the case of testing/debugging. --- src/mod_presence.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mod_presence.erl b/src/mod_presence.erl index 9da95035247..3e3cc1ee696 100644 --- a/src/mod_presence.erl +++ b/src/mod_presence.erl @@ -312,7 +312,7 @@ route_probe(Acc, Presences, FromJid, ToJid) -> Packet0 = Presences#presences_state.pres_last, TS = Presences#presences_state.pres_timestamp, %% To is the one sending the presence (the target of the probe) - Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<>>), + Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<"Delayed presence">>), HostType = mongoose_acc:host_type(Acc), Acc2 = mongoose_hooks:presence_probe(HostType, Acc, FromJid, ToJid, self()), %% Don't route a presence probe to oneself From 592147744b1f4c61f2ce2626ccd2927d6a385709 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Thu, 5 Dec 2024 18:41:03 +0100 Subject: [PATCH 4/7] Test stream_management resume_timeout The tests weren't really waiting for a timeout before. Extracted the tests into their own group and added one with different resource to show how it is handled by MIM. --- big_tests/tests/sm_SUITE.erl | 57 ++++++++++++++++++++++++++++++++--- big_tests/tests/sm_helper.erl | 7 +++-- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index a4d3257e325..122deeda01d 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -63,7 +63,8 @@ groups() -> {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()}, {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, {stale_h, [], stale_h_cases()}, - {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()} + {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()}, + {resume_timeout, [parallel], resume_timeout_cases()} ]. ws_tests() -> @@ -72,6 +73,7 @@ ws_tests() -> {group, manual_ack_freq_2}, {group, stale_h}, {group, parallel_unacknowledged_message_hook}, + {group, resume_timeout}, ping_timeout]. tcp_tests() -> @@ -80,6 +82,7 @@ tcp_tests() -> {group, manual_ack_freq_2}, {group, stale_h}, {group, parallel_unacknowledged_message_hook}, + {group, resume_timeout}, ping_timeout]. parallel_cases() -> @@ -116,7 +119,6 @@ parallel_cases() -> parallel_manual_ack_freq_1_cases() -> [client_acks_more_than_sent, too_many_unacked_stanzas, - resend_unacked_after_resume_timeout, resume_session_state_send_message_with_ack, resume_session_state_send_message_without_ack, resume_session_state_stop_c2s, @@ -127,6 +129,10 @@ parallel_manual_ack_freq_1_cases() -> manual_ack_freq_2_cases() -> [server_requests_ack_freq_2]. +resume_timeout_cases() -> + [resend_unacked_after_resume_timeout, + resend_unacked_to_different_res_after_resume_timeout]. + stale_h_cases() -> [resume_expired_session_returns_correct_h, gc_repeat_after_never_means_no_cleaning, @@ -258,6 +264,8 @@ required_sm_opts(group, parallel_unacknowledged_message_hook) -> #{ack_freq => 1}; required_sm_opts(group, manual_ack_freq_long_session_timeout) -> #{ack_freq => 1, buffer_max => 1000}; +required_sm_opts(group, resume_timeout) -> + #{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT}; required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> #{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT, @@ -630,7 +638,7 @@ resend_unacked_after_resume_timeout(Config) -> User = connect_fresh(Config, ?config(user, Config), sr_presence), UserSpec = sm_helper:client_to_spec(User), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to(User, <<"msg-1">>)), %% kill user connection escalus_connection:kill(User), @@ -638,12 +646,51 @@ resend_unacked_after_resume_timeout(Config) -> C2SPid = mongoose_helper:get_session_pid(User), sm_helper:wait_until_resume_session(C2SPid), - %% user come back and receives unacked message + %% user comes back NewUser = connect_spec(UserSpec, session), send_initial_presence(NewUser), + %% resume timeout passes + timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)), + + %% user receives unacked message and initial presence + UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2), escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], - escalus:wait_for_stanzas(NewUser, 2)), + UnackedStanzas), + [UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas), + sm_helper:assert_delayed(UnackedMsg), + escalus_assert:has_no_stanzas(NewUser), + + escalus_connection:stop(Bob), + escalus_connection:stop(NewUser). + + +resend_unacked_to_different_res_after_resume_timeout(Config) -> + %% connect bob and user + Bob = connect_fresh(Config, bob, presence), + User = connect_fresh(Config, ?config(user, Config), sr_presence), + UserSpec = sm_helper:client_to_spec(User), + + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + %% kill user connection + escalus_connection:kill(User), + + %% ensure there is no session + C2SPid = mongoose_helper:get_session_pid(User), + sm_helper:wait_until_resume_session(C2SPid), + + %% user comes back with different resource + NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], sr_presence), + + %% resume timeout passes + timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)), + + %% user receives unacked message and presence + UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2), + escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], UnackedStanzas), + [sm_helper:assert_delayed(S) || S <- UnackedStanzas], + + escalus_assert:has_no_stanzas(NewUser), escalus_connection:stop(Bob), escalus_connection:stop(NewUser). diff --git a/big_tests/tests/sm_helper.erl b/big_tests/tests/sm_helper.erl index 554a8013561..56ef5b2107e 100644 --- a/big_tests/tests/sm_helper.erl +++ b/big_tests/tests/sm_helper.erl @@ -36,6 +36,7 @@ send_messages/3, wait_for_messages/2, wait_for_delayed_messages/2, + assert_delayed/1, assert_messages/2, send_and_receive/3, get_ack/1, @@ -297,8 +298,10 @@ wait_for_messages(Alice, Texts) -> wait_for_delayed_messages(Alice, Texts) -> Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)), assert_messages(Stanzas, Texts), - [escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(S, <<"delay">>)) - || S <- Stanzas]. + [assert_delayed(S) || S <- Stanzas]. + +assert_delayed(Stanza) -> + escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(Stanza, <<"delay">>)). assert_messages(Stanzas, Texts) -> Bodies = lists:map(fun get_body/1, Stanzas), From 30722921c48222e27208db4638a1365b3930917f Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Thu, 5 Dec 2024 18:44:13 +0100 Subject: [PATCH 5/7] Add detailed comment and ensure presence is received --- big_tests/tests/sm_SUITE.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index 122deeda01d..e6440001ba6 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -553,7 +553,7 @@ resend_unacked_on_reconnection(Config) -> sm_helper:send_messages(Bob, User, Texts), %% User receives the messages. sm_helper:wait_for_messages(User, Texts), - %% User disconnects without acking the messages. + %% User disconnects ending stream gracefully, but without acking the messages. sm_helper:stop_client_and_wait_for_termination(User), %% Messages go to the offline store. %% User receives the messages from the offline store. @@ -562,7 +562,11 @@ resend_unacked_on_reconnection(Config) -> sm_helper:wait_for_delayed_messages(NewUser, Texts), %% User acks the delayed messages so they won't go again %% to the offline store. - escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)). + escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)), + % user receives initial presence response + P = escalus:wait_for_stanza(NewUser), + escalus:assert(is_presence, P), + escalus_connection:stop(NewUser). %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order %% TODO Test without wait_for_n_offline_messages. It would require changes in SM From 16f4aa3d9a48a7500ab86e897beba043a1bed9c5 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Fri, 6 Dec 2024 10:18:48 +0100 Subject: [PATCH 6/7] Make SM reconnect test more robust The initial presence response and delayed message/presence can come in any order. --- big_tests/tests/sm_SUITE.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index e6440001ba6..a24cd7d6254 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -684,15 +684,18 @@ resend_unacked_to_different_res_after_resume_timeout(Config) -> sm_helper:wait_until_resume_session(C2SPid), %% user comes back with different resource - NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], sr_presence), + NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], session), + send_initial_presence(NewUser), %% resume timeout passes timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)), - %% user receives unacked message and presence - UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2), - escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], UnackedStanzas), - [sm_helper:assert_delayed(S) || S <- UnackedStanzas], + %% user receives unacked message and presence, as well as initial presence response + %% the order of the messages may change, especially on CI, so we test all of them + UnackedStanzas = escalus:wait_for_stanzas(NewUser, 3), + escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>), is_presence], UnackedStanzas), + [UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas), + sm_helper:assert_delayed(UnackedMsg), escalus_assert:has_no_stanzas(NewUser), From 4415dbeaa942daac6c8808e48ba55db10ec1f1d6 Mon Sep 17 00:00:00 2001 From: Gustaw Lippa Date: Fri, 6 Dec 2024 12:57:25 +0100 Subject: [PATCH 7/7] Extract helper function --- .../mod_stream_management.erl | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/stream_management/mod_stream_management.erl b/src/stream_management/mod_stream_management.erl index 39dbae4b1fa..c92e19f1ef6 100644 --- a/src/stream_management/mod_stream_management.erl +++ b/src/stream_management/mod_stream_management.erl @@ -397,15 +397,7 @@ reroute_buffer(StateData, #sm_state{buffer = Buffer}) -> mongoose_c2s:reroute_buffer(StateData, Buffer). add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) -> - BufferWithDelays = [begin - TS = mongoose_acc:timestamp(Acc), - StanzaName = mongoose_acc:stanza_name(Acc), - StanzaType = mongoose_acc:stanza_type(Acc), - {From, To, El} = mongoose_acc:packet(Acc), - ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer), - AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay}, - mongoose_acc:update_stanza(AccParams, Acc) - end || Acc <- Buffer], + BufferWithDelays = [maybe_add_timestamp(Acc, FromServer) || Acc <- Buffer], SmState#sm_state{buffer = BufferWithDelays}. -spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term(). @@ -629,14 +621,20 @@ get_all_stanzas_to_forward(StateData, SMID) -> LServer = mongoose_c2s:get_lserver(StateData), FromServer = jid:make_noprep(<<>>, LServer, <<>>), ToForward = [ begin - TS = mongoose_acc:timestamp(Acc), - Packet = mongoose_acc:element(Acc), - StanzaName = mongoose_acc:stanza_name(Acc), - StanzaType = mongoose_acc:stanza_type(Acc), - maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer) + AccWithTS = maybe_add_timestamp(Acc, FromServer), + mongoose_acc:element(AccWithTS) end || Acc <- lists:reverse(Buffer)], {Resumed, ToForward}. +maybe_add_timestamp(Acc, FromServer) -> + TS = mongoose_acc:timestamp(Acc), + StanzaName = mongoose_acc:stanza_name(Acc), + StanzaType = mongoose_acc:stanza_type(Acc), + {From, To, El} = mongoose_acc:packet(Acc), + ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer), + AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay}, + mongoose_acc:update_stanza(AccParams, Acc). + maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) -> Packet; maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) ->