Skip to content

Commit

Permalink
Merge pull request #4407 from esl/fix-duplicated-mam-msg
Browse files Browse the repository at this point in the history
Add delays to SM buffer

This PR adds <delay> elements to stanzas rerouted through mod_stream_management. This element was already present when a user requested stream resumption, and had unacknowledged stanzas.

Currently, when a user reconnects, has unacknowledged stanzas, and doesn't request stream resumption, the stream management rerouting behaviour is to try to resend the stanzas to them, because MIM doesn't know if they have reached the user. However, if the user is using a different resource, the stanzas would get rerouted only after resume_timeout passes, which could be surprising without the <delay> element. Even when rerouting to the same resource before the timeout, I think adding the <delay> could help client developers detect retransmitted stanzas.

The <delay> elements sent by SM and mod_presence will now also include a description of where the delay is coming from (this is in allowed by https://xmpp.org/extensions/xep-0203.html). This may improve debugging in corner cases.

I've also applied some fixes to the sm_SUITE and reworked the resend_unacked_after_resume_timeout so that the testcase reflects the title. With the new changes it is indeed checking the after_resume_timeout part. I've added a similar case which checks the behaviour for users reconnecting with a different resource.
  • Loading branch information
NelsonVides authored Dec 6, 2024
2 parents 0076a25 + 4415dbe commit cf2de15
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 15 deletions.
70 changes: 62 additions & 8 deletions big_tests/tests/sm_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ groups() ->
{parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()},
{manual_ack_freq_2, [], manual_ack_freq_2_cases()},
{stale_h, [], stale_h_cases()},
{parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()}
{parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()},
{resume_timeout, [parallel], resume_timeout_cases()}
].

ws_tests() ->
Expand All @@ -72,6 +73,7 @@ ws_tests() ->
{group, manual_ack_freq_2},
{group, stale_h},
{group, parallel_unacknowledged_message_hook},
{group, resume_timeout},
ping_timeout].

tcp_tests() ->
Expand All @@ -80,6 +82,7 @@ tcp_tests() ->
{group, manual_ack_freq_2},
{group, stale_h},
{group, parallel_unacknowledged_message_hook},
{group, resume_timeout},
ping_timeout].

parallel_cases() ->
Expand Down Expand Up @@ -116,7 +119,6 @@ parallel_cases() ->
parallel_manual_ack_freq_1_cases() ->
[client_acks_more_than_sent,
too_many_unacked_stanzas,
resend_unacked_after_resume_timeout,
resume_session_state_send_message_with_ack,
resume_session_state_send_message_without_ack,
resume_session_state_stop_c2s,
Expand All @@ -127,6 +129,10 @@ parallel_manual_ack_freq_1_cases() ->
manual_ack_freq_2_cases() ->
[server_requests_ack_freq_2].

resume_timeout_cases() ->
[resend_unacked_after_resume_timeout,
resend_unacked_to_different_res_after_resume_timeout].

stale_h_cases() ->
[resume_expired_session_returns_correct_h,
gc_repeat_after_never_means_no_cleaning,
Expand Down Expand Up @@ -258,6 +264,8 @@ required_sm_opts(group, parallel_unacknowledged_message_hook) ->
#{ack_freq => 1};
required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
#{ack_freq => 1, buffer_max => 1000};
required_sm_opts(group, resume_timeout) ->
#{ack_freq => 1, resume_timeout => ?SHORT_TIMEOUT};
required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
#{ack_freq => 1,
resume_timeout => ?SHORT_TIMEOUT,
Expand Down Expand Up @@ -545,16 +553,20 @@ resend_unacked_on_reconnection(Config) ->
sm_helper:send_messages(Bob, User, Texts),
%% User receives the messages.
sm_helper:wait_for_messages(User, Texts),
%% User disconnects without acking the messages.
%% User disconnects ending stream gracefully, but without acking the messages.
sm_helper:stop_client_and_wait_for_termination(User),
%% Messages go to the offline store.
%% User receives the messages from the offline store.
NewUser = connect_spec(UserSpec, session, manual),
send_initial_presence(NewUser),
sm_helper:wait_for_messages(NewUser, Texts),
sm_helper:wait_for_delayed_messages(NewUser, Texts),
%% User acks the delayed messages so they won't go again
%% to the offline store.
escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)).
escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)),
% user receives initial presence response
P = escalus:wait_for_stanza(NewUser),
escalus:assert(is_presence, P),
escalus_connection:stop(NewUser).

%% Remove wait_for_n_offline_messages and you will get anything, but preserve_order
%% TODO Test without wait_for_n_offline_messages. It would require changes in SM
Expand Down Expand Up @@ -630,20 +642,62 @@ resend_unacked_after_resume_timeout(Config) ->
User = connect_fresh(Config, ?config(user, Config), sr_presence),
UserSpec = sm_helper:client_to_spec(User),

escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)),
escalus_connection:send(Bob, escalus_stanza:chat_to(User, <<"msg-1">>)),
%% kill user connection
escalus_connection:kill(User),

%% ensure there is no session
C2SPid = mongoose_helper:get_session_pid(User),
sm_helper:wait_until_resume_session(C2SPid),

%% user come back and receives unacked message
%% user comes back
NewUser = connect_spec(UserSpec, session),
send_initial_presence(NewUser),

%% resume timeout passes
timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)),

%% user receives unacked message and initial presence
UnackedStanzas = escalus:wait_for_stanzas(NewUser, 2),
escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)],
escalus:wait_for_stanzas(NewUser, 2)),
UnackedStanzas),
[UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas),
sm_helper:assert_delayed(UnackedMsg),
escalus_assert:has_no_stanzas(NewUser),

escalus_connection:stop(Bob),
escalus_connection:stop(NewUser).


resend_unacked_to_different_res_after_resume_timeout(Config) ->
%% connect bob and user
Bob = connect_fresh(Config, bob, presence),
User = connect_fresh(Config, ?config(user, Config), sr_presence),
UserSpec = sm_helper:client_to_spec(User),

escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)),
%% kill user connection
escalus_connection:kill(User),

%% ensure there is no session
C2SPid = mongoose_helper:get_session_pid(User),
sm_helper:wait_until_resume_session(C2SPid),

%% user comes back with different resource
NewUser = connect_spec([{resource, <<"2nd_resource">>} | UserSpec], session),
send_initial_presence(NewUser),

%% resume timeout passes
timer:sleep(timer:seconds(?SHORT_TIMEOUT + 1)),

%% user receives unacked message and presence, as well as initial presence response
%% the order of the messages may change, especially on CI, so we test all of them
UnackedStanzas = escalus:wait_for_stanzas(NewUser, 3),
escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>), is_presence], UnackedStanzas),
[UnackedMsg] = lists:filter(fun escalus_pred:is_message/1, UnackedStanzas),
sm_helper:assert_delayed(UnackedMsg),

escalus_assert:has_no_stanzas(NewUser),

escalus_connection:stop(Bob),
escalus_connection:stop(NewUser).
Expand Down
10 changes: 10 additions & 0 deletions big_tests/tests/sm_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
-export([send_initial_presence/1,
send_messages/3,
wait_for_messages/2,
wait_for_delayed_messages/2,
assert_delayed/1,
assert_messages/2,
send_and_receive/3,
get_ack/1,
Expand Down Expand Up @@ -293,6 +295,14 @@ wait_for_messages(Alice, Texts) ->
Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)),
assert_messages(Stanzas, Texts).

wait_for_delayed_messages(Alice, Texts) ->
Stanzas = escalus:wait_for_stanzas(Alice, length(Texts)),
assert_messages(Stanzas, Texts),
[assert_delayed(S) || S <- Stanzas].

assert_delayed(Stanza) ->
escalus:assert(has_ns, [<<"urn:xmpp:delay">>], exml_query:subelement(Stanza, <<"delay">>)).

assert_messages(Stanzas, Texts) ->
Bodies = lists:map(fun get_body/1, Stanzas),
case Bodies of
Expand Down
2 changes: 1 addition & 1 deletion src/mod_presence.erl
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ route_probe(Acc, Presences, FromJid, ToJid) ->
Packet0 = Presences#presences_state.pres_last,
TS = Presences#presences_state.pres_timestamp,
%% To is the one sending the presence (the target of the probe)
Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<>>),
Packet1 = jlib:maybe_append_delay(Packet0, ToJid, TS, <<"Delayed presence">>),
HostType = mongoose_acc:host_type(Acc),
Acc2 = mongoose_hooks:presence_probe(HostType, Acc, FromJid, ToJid, self()),
%% Don't route a presence probe to oneself
Expand Down
24 changes: 18 additions & 6 deletions src/stream_management/mod_stream_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,20 @@ maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, no
handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) ->
Sid = mongoose_c2s:get_sid(StateData),
do_remove_smid(HostType, Sid, H),
reroute_buffer(StateData, SmState),
FromServer = mongoose_c2s:get_lserver(StateData),
NewState = add_delay_elements_to_buffer(SmState, FromServer),
reroute_buffer(StateData, NewState),
SmState#sm_state{buffer = [], buffer_size = 0}.

reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) ->
mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer);
reroute_buffer(StateData, #sm_state{buffer = Buffer}) ->
mongoose_c2s:reroute_buffer(StateData, Buffer).

add_delay_elements_to_buffer(#sm_state{buffer = Buffer} = SmState, FromServer) ->
BufferWithDelays = [maybe_add_timestamp(Acc, FromServer) || Acc <- Buffer],
SmState#sm_state{buffer = BufferWithDelays}.

-spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term().
terminate(Reason, C2SState, StateData) ->
?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason,
Expand Down Expand Up @@ -615,14 +621,20 @@ get_all_stanzas_to_forward(StateData, SMID) ->
LServer = mongoose_c2s:get_lserver(StateData),
FromServer = jid:make_noprep(<<>>, LServer, <<>>),
ToForward = [ begin
TS = mongoose_acc:timestamp(Acc),
Packet = mongoose_acc:element(Acc),
StanzaName = mongoose_acc:stanza_name(Acc),
StanzaType = mongoose_acc:stanza_type(Acc),
maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer)
AccWithTS = maybe_add_timestamp(Acc, FromServer),
mongoose_acc:element(AccWithTS)
end || Acc <- lists:reverse(Buffer)],
{Resumed, ToForward}.

maybe_add_timestamp(Acc, FromServer) ->
TS = mongoose_acc:timestamp(Acc),
StanzaName = mongoose_acc:stanza_name(Acc),
StanzaType = mongoose_acc:stanza_type(Acc),
{From, To, El} = mongoose_acc:packet(Acc),
ElWithDelay = maybe_add_timestamp(El, StanzaName, StanzaType, TS, FromServer),
AccParams = #{from_jid => From, to_jid => To, element => ElWithDelay},
mongoose_acc:update_stanza(AccParams, Acc).

maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) ->
Packet;
maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) ->
Expand Down

0 comments on commit cf2de15

Please sign in to comment.