diff --git a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs index 3c688e3a8..001dac60b 100644 --- a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs +++ b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs @@ -77,6 +77,7 @@ public Task MessageReceived(ProtocolMessage protocolMessage, RealtimeState } else { + channel.Presence.ChannelAttached(protocolMessage); channel.SetChannelState(ChannelState.Attached, protocolMessage); } diff --git a/src/IO.Ably.Shared/Realtime/Presence.cs b/src/IO.Ably.Shared/Realtime/Presence.cs index 3c8e7c03e..5b29ffc7a 100644 --- a/src/IO.Ably.Shared/Realtime/Presence.cs +++ b/src/IO.Ably.Shared/Realtime/Presence.cs @@ -647,7 +647,7 @@ private void EnterMembersFromInternalPresenceMap() { try { - var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data, item.Id); + var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data); UpdatePresence(itemToSend, (success, info) => { if (!success) diff --git a/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs b/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs index d36ffaa36..c9159ab2b 100644 --- a/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs +++ b/src/IO.Ably.Shared/Realtime/RealtimeChannel.cs @@ -685,7 +685,6 @@ private void HandleStateChange(ChannelState state, ErrorInfo error, ProtocolMess case ChannelState.Attached: _retryCount = 0; AttachResume = true; - Presence.ChannelAttached(protocolMessage); break; case ChannelState.Detached: /* RTL13a check for unexpected detach */ diff --git a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs index ce63c91bc..2164856b1 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs @@ -310,7 +310,7 @@ public async Task PresenceMapBehaviour_ShouldConformToSpec(Protocol protocol) } } - [Theory(Skip = "Intermittently fails")] + [Theory(Skip = "Intermittently fails, also need to update the test as per spec")] [InlineData(Protocol.Json, 30)] // Wait for 30 seconds [InlineData(Protocol.Json, 60)] // Wait for 1 minute [Trait("spec", "RTP17e")] @@ -420,6 +420,93 @@ async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) } } + [Theory] + [ProtocolData] + [Trait("spec", "RTP17f")] + public async Task OnResumedAttach_ShouldReEnterMembersFromInternalMap(Protocol protocol) + { + var channelName = "RTP17c2".AddRandomSuffix(); + var setupClient = await GetRealtimeClient(protocol); + var setupChannel = setupClient.Channels.Get(channelName); + + // enter 3 client to the channel + for (int i = 0; i < 3; i++) + { + await setupChannel.Presence.EnterClientAsync($"member_{i}", null); + } + + var client = await GetRealtimeClient(protocol, (options, settings) => { options.ClientId = "local"; }); + await client.WaitForState(); + var channel = client.Channels.Get(channelName); + var presence = channel.Presence; + + var p = await presence.GetAsync(); + p.Should().HaveCount(3); + + await presence.EnterAsync(); + + await Task.Delay(250); + presence.MembersMap.Members.Should().HaveCount(4); + presence.InternalMembersMap.Members.Should().HaveCount(1); + + List leaveMessages = new List(); + PresenceMessage updateMessage = null; + PresenceMessage enterMessage = null; + await WaitForMultiple(2, partialDone => + { + presence.Subscribe(PresenceAction.Leave, message => + { + leaveMessages.Add(message); + }); + presence.Subscribe(PresenceAction.Update, message => + { + updateMessage = message; + partialDone(); // 1 call + }); + presence.Subscribe(PresenceAction.Enter, message => + { + enterMessage = message; // not expected to hit + }); + client.GetTestTransport().AfterDataReceived = message => + { + if (message.Action == ProtocolMessage.MessageAction.Attached) + { + bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence); + hasPresence.Should().BeFalse(); + bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed); + resumed.Should().BeTrue(); + client.GetTestTransport().AfterDataReceived = _ => { }; + partialDone(); // 1 call + } + }; + // inject duplicate attached message with resume flag ( no RTL12 message loss event) + var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached) + { + Channel = channelName, + Flags = 0, + }; + protocolMessage.SetFlag(ProtocolMessage.Flag.Resumed); + protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed).Should().BeTrue(); + client.GetTestTransport().FakeReceivedMessage(protocolMessage); + }); + + leaveMessages.Should().HaveCount(4); + foreach (var msg in leaveMessages) + { + msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local"); + } + + updateMessage.Should().NotBeNull(); + updateMessage.ClientId.Should().Be("local"); + enterMessage.Should().BeNull(); + + presence.Unsubscribe(); + var remainingMembers = await presence.GetAsync(); + + remainingMembers.Should().HaveCount(1); + remainingMembers.First().ClientId.Should().Be("local"); + } + [Theory] [ProtocolData] [Trait("spec", "RTP17")]