From 2ef3c5f5ab81e8bb56116eb17a6f36d8454b8f0c Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sun, 17 Sep 2023 14:50:00 +0530 Subject: [PATCH] Updated resume test for processing pending messages --- .../Realtime/ConnectionSandBoxSpecs.cs | 51 +++++++++++++++++-- 1 file changed, 47 insertions(+), 4 deletions(-) diff --git a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs index 3cd5c62eb..66e450d35 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs @@ -620,7 +620,8 @@ await WaitFor(done => [ProtocolData] [Trait("spec", "RTN15c6")] [Trait("spec", "RTN15c7")] - public async Task ResumeRequest_ConnectedProtocolMessageWithSameOrNewConnectionId_AttachesAllChannels(Protocol protocol) + public async Task ResumeRequest_ConnectedProtocolMessage_AttachAllChannelsAndProcessPendingMessages( + Protocol protocol) { var client = await GetRealtimeClient(protocol); var channelName = "RTN15c6.RTN15c7.".AddRandomSuffix(); @@ -636,6 +637,31 @@ public async Task ResumeRequest_ConnectedProtocolMessageWithSameOrNewConnectionI channels.Add(channel as RealtimeChannel); } + TaskCompletionAwaiter connectedAwaiter = null; + + // Should add messages to connection-wide pending queue + async Task PublishMessagesWhileNotConnected() + { + // Make sure new transport is created to apply BeforeProtocolMessageProcessed method + await client.WaitForState(ConnectionState.Connecting); + connectedAwaiter = new TaskCompletionAwaiter(15000); + client.BeforeProtocolMessageProcessed(message => + { + if (message.Action == ProtocolMessage.MessageAction.Connected) + { + connectedAwaiter.Task.Wait(); // Keep in connecting state + } + }); + + foreach (var channel in channels) + { + channel.Publish("eventName", "foo"); + } + + await client.ProcessCommands(); + client.State.PendingMessages.Should().HaveCount(channelCount); + } + async Task CheckForAttachedChannelsAfterResume() { var attachedChannels = new List(); @@ -652,27 +678,44 @@ await WaitForMultiple(channelCount, partialDone => }); }); } + + connectedAwaiter.SetCompleted(); }); attachedChannels.Should().HaveCount(channelCount); + } - // TODO - check for processed pending messages + async Task CheckForProcessedPendingMessages() + { + client.State.PendingMessages.Should().HaveCount(0); + foreach (var channel in channels) + { + var history = await channel.HistoryAsync(); + history.Items.Count.Should().BeGreaterOrEqualTo(1); + history.Items[0].Data.Should().Be("foo"); + } } + // resume successful - RTN15c6 client.GetTestTransport().Close(false); + await PublishMessagesWhileNotConnected(); await CheckForAttachedChannelsAfterResume(); + await CheckForProcessedPendingMessages(); client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset var connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); - connectedProtocolMessage.ConnectionId.Should().Be(connectionId); // resume successful - RTN15c6 + connectedProtocolMessage.ConnectionId.Should().Be(connectionId); connectedProtocolMessage.Error.Should().BeNull(); client.Connection.ErrorReason.Should().BeNull(); + // resume unsuccessful - RTN15c7 client.State.Connection.Id = string.Empty; client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request client.GetTestTransport().Close(false); + await PublishMessagesWhileNotConnected(); // TODO - Fix process pending messages await CheckForAttachedChannelsAfterResume(); + await CheckForProcessedPendingMessages(); client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected); - connectedProtocolMessage.ConnectionId.Should().NotBe(connectionId); // resume unsuccessful - RTN15c7 + connectedProtocolMessage.ConnectionId.Should().NotBe(connectionId); connectedProtocolMessage.Error.Should().NotBeNull(); client.Connection.ErrorReason.ToString().Should().Be(connectedProtocolMessage.Error.ToString()); client.Connection.ErrorReason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId);