diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs index 9485b5dcc..30c153552 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs @@ -4,6 +4,10 @@ namespace IO.Ably.Tests.Infrastructure { + /// + /// This class is used for controlling wrapped TestTranport externally. + /// Can only be passed as a clientOption. + /// public class TestTransportFactory : ITransportFactory { private readonly Action _onWrappedTransportCreated; diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs index 3f7753535..d3c41188f 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs @@ -417,10 +417,77 @@ public async Task ShouldDisconnectIfConnectionIsNotEstablishedWithInDefaultTimeo await new ConditionalAwaiter(() => disconnectedStateError != null); } + [Theory] + [ProtocolData] + [Trait("spec", "RTN15d")] + public async Task ResumeRequest_ShouldReceivePendingMessagesOnceConnectionResumed(Protocol protocol) + { + var client1 = await GetRealtimeClient(protocol); + await client1.WaitForState(ConnectionState.Connected); + + var client2TransportFactory = new TestTransportFactory(); + var client2 = await GetRealtimeClient(protocol, (options, _) => + { + options.TransportFactory = client2TransportFactory; + }); + await client2.WaitForState(ConnectionState.Connected); + var client2InitialConnectionId = client2.Connection.Id; + + var channelName = "RTN15d".AddRandomSuffix(); + var channelsCount = 5; + for (var i = 0; i < channelsCount; i++) + { + await client1.Channels.Get($"{channelName}_{i}").AttachAsync(); + await client2.Channels.Get($"{channelName}_{i}").AttachAsync(); + } + + var client2MessageAwaiter = new TaskCompletionAwaiter(15000, channelsCount); + var channel2Messages = new List(); + foreach (var realtimeChannel in client2.Channels) + { + realtimeChannel.Subscribe("data", message => + { + channel2Messages.Add(message); + client2MessageAwaiter.Tick(); + }); + } + + var client2ConnectedAwaiter = new TaskCompletionAwaiter(15000, channelsCount); + client2TransportFactory.BeforeMessageSent += message => + { + if (message.Action == ProtocolMessage.MessageAction.Connect) + { + client2ConnectedAwaiter.Task.Wait(); + } + }; + client2.ConnectionManager.Transport.Close(false); + await client2.WaitForState(ConnectionState.Disconnected); + + // Publish messages on client1 channels + foreach (var client1Channel in client1.Channels) + { + await client1Channel.PublishAsync("data", "hello"); + } + + client2ConnectedAwaiter.SetCompleted(); + await client2.WaitForState(ConnectionState.Connected); + client2.Connection.ErrorReason.Should().BeNull(); + client2.Connection.Id.Should().Be(client2InitialConnectionId); // connection is successfully resumed/recovered + + var client2MessagesReceived = await client2MessageAwaiter.Task; + client2MessagesReceived.Should().BeTrue(); + + channel2Messages.Should().HaveCount(channelsCount); + foreach (var channel2Message in channel2Messages) + { + channel2Message.Data.Should().Be("hello"); + } + } + [Theory] [ProtocolData] [Trait("spec", "RTN15e")] - public async Task ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol) + public async Task ResumeRequest_ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol) { var client = await GetRealtimeClient(protocol); await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10)); @@ -574,7 +641,7 @@ async Task CheckForProcessedPendingMessages() client.State.Connection.Id = string.Empty; client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request client.GetTestTransport().Close(false); - await PublishMessagesWhileNotConnected(); // TODO - Fix process pending messages + await PublishMessagesWhileNotConnected(); await CheckForAttachedChannelsAfterResume(); await CheckForProcessedPendingMessages(); client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset