Skip to content

Commit

Permalink
Updated resume test for processing pending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Sep 17, 2023
1 parent 52f3a93 commit 2ef3c5f
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ await WaitFor(done =>
[ProtocolData]
[Trait("spec", "RTN15c6")]
[Trait("spec", "RTN15c7")]
public async Task ResumeRequest_ConnectedProtocolMessageWithSameOrNewConnectionId_AttachesAllChannels(Protocol protocol)
public async Task ResumeRequest_ConnectedProtocolMessage_AttachAllChannelsAndProcessPendingMessages(
Protocol protocol)
{
var client = await GetRealtimeClient(protocol);
var channelName = "RTN15c6.RTN15c7.".AddRandomSuffix();
Expand All @@ -636,6 +637,31 @@ public async Task ResumeRequest_ConnectedProtocolMessageWithSameOrNewConnectionI
channels.Add(channel as RealtimeChannel);
}

TaskCompletionAwaiter connectedAwaiter = null;

// Should add messages to connection-wide pending queue
async Task PublishMessagesWhileNotConnected()
{
// Make sure new transport is created to apply BeforeProtocolMessageProcessed method
await client.WaitForState(ConnectionState.Connecting);
connectedAwaiter = new TaskCompletionAwaiter(15000);
client.BeforeProtocolMessageProcessed(message =>
{
if (message.Action == ProtocolMessage.MessageAction.Connected)
{
connectedAwaiter.Task.Wait(); // Keep in connecting state
}
});

foreach (var channel in channels)
{
channel.Publish("eventName", "foo");
}

await client.ProcessCommands();
client.State.PendingMessages.Should().HaveCount(channelCount);
}

async Task CheckForAttachedChannelsAfterResume()
{
var attachedChannels = new List<RealtimeChannel>();
Expand All @@ -652,27 +678,44 @@ await WaitForMultiple(channelCount, partialDone =>
});
});
}

connectedAwaiter.SetCompleted();
});
attachedChannels.Should().HaveCount(channelCount);
}

// TODO - check for processed pending messages
async Task CheckForProcessedPendingMessages()
{
client.State.PendingMessages.Should().HaveCount(0);
foreach (var channel in channels)
{
var history = await channel.HistoryAsync();
history.Items.Count.Should().BeGreaterOrEqualTo(1);
history.Items[0].Data.Should().Be("foo");
}
}

// resume successful - RTN15c6
client.GetTestTransport().Close(false);
await PublishMessagesWhileNotConnected();
await CheckForAttachedChannelsAfterResume();
await CheckForProcessedPendingMessages();
client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset
var connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected);
connectedProtocolMessage.ConnectionId.Should().Be(connectionId); // resume successful - RTN15c6
connectedProtocolMessage.ConnectionId.Should().Be(connectionId);
connectedProtocolMessage.Error.Should().BeNull();
client.Connection.ErrorReason.Should().BeNull();

// resume unsuccessful - RTN15c7
client.State.Connection.Id = string.Empty;
client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request
client.GetTestTransport().Close(false);
await PublishMessagesWhileNotConnected(); // TODO - Fix process pending messages
await CheckForAttachedChannelsAfterResume();
await CheckForProcessedPendingMessages();
client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset
connectedProtocolMessage = client.GetTestTransport().ProtocolMessagesReceived.First(x => x.Action == ProtocolMessage.MessageAction.Connected);
connectedProtocolMessage.ConnectionId.Should().NotBe(connectionId); // resume unsuccessful - RTN15c7
connectedProtocolMessage.ConnectionId.Should().NotBe(connectionId);
connectedProtocolMessage.Error.Should().NotBeNull();
client.Connection.ErrorReason.ToString().Should().Be(connectedProtocolMessage.Error.ToString());
client.Connection.ErrorReason.Code.Should().Be(ErrorCodes.InvalidFormatForConnectionId);
Expand Down

0 comments on commit 2ef3c5f

Please sign in to comment.