Skip to content

Commit

Permalink
Added missing test for RTN15d, messages should be received after resu…
Browse files Browse the repository at this point in the history
…me success
  • Loading branch information
sacOO7 committed Sep 17, 2023
1 parent ec739de commit 0e9918e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

namespace IO.Ably.Tests.Infrastructure
{
/// <summary>
/// This class is used for controlling wrapped TestTranport externally.
/// Can only be passed as a clientOption.
/// </summary>
public class TestTransportFactory : ITransportFactory
{
private readonly Action<TestTransportWrapper> _onWrappedTransportCreated;
Expand Down
71 changes: 69 additions & 2 deletions src/IO.Ably.Tests.Shared/Realtime/ConnectionSandBoxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,77 @@ public async Task ShouldDisconnectIfConnectionIsNotEstablishedWithInDefaultTimeo
await new ConditionalAwaiter(() => disconnectedStateError != null);
}

[Theory]
[ProtocolData]
[Trait("spec", "RTN15d")]
public async Task ResumeRequest_ShouldReceivePendingMessagesOnceConnectionResumed(Protocol protocol)
{
var client1 = await GetRealtimeClient(protocol);
await client1.WaitForState(ConnectionState.Connected);

var client2TransportFactory = new TestTransportFactory();
var client2 = await GetRealtimeClient(protocol, (options, _) =>
{
options.TransportFactory = client2TransportFactory;
});
await client2.WaitForState(ConnectionState.Connected);
var client2InitialConnectionId = client2.Connection.Id;

var channelName = "RTN15d".AddRandomSuffix();
var channelsCount = 5;
for (var i = 0; i < channelsCount; i++)
{
await client1.Channels.Get($"{channelName}_{i}").AttachAsync();
await client2.Channels.Get($"{channelName}_{i}").AttachAsync();
}

var client2MessageAwaiter = new TaskCompletionAwaiter(15000, channelsCount);
var channel2Messages = new List<Message>();
foreach (var realtimeChannel in client2.Channels)
{
realtimeChannel.Subscribe("data", message =>
{
channel2Messages.Add(message);
client2MessageAwaiter.Tick();
});
}

var client2ConnectedAwaiter = new TaskCompletionAwaiter(15000, channelsCount);
client2TransportFactory.BeforeMessageSent += message =>
{
if (message.Action == ProtocolMessage.MessageAction.Connect)
{
client2ConnectedAwaiter.Task.Wait();
}
};
client2.ConnectionManager.Transport.Close(false);
await client2.WaitForState(ConnectionState.Disconnected);

// Publish messages on client1 channels
foreach (var client1Channel in client1.Channels)
{
await client1Channel.PublishAsync("data", "hello");
}

client2ConnectedAwaiter.SetCompleted();
await client2.WaitForState(ConnectionState.Connected);
client2.Connection.ErrorReason.Should().BeNull();
client2.Connection.Id.Should().Be(client2InitialConnectionId); // connection is successfully resumed/recovered

var client2MessagesReceived = await client2MessageAwaiter.Task;
client2MessagesReceived.Should().BeTrue();

channel2Messages.Should().HaveCount(channelsCount);
foreach (var channel2Message in channel2Messages)
{
channel2Message.Data.Should().Be("hello");
}
}

[Theory]
[ProtocolData]
[Trait("spec", "RTN15e")]
public async Task ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol)
public async Task ResumeRequest_ShouldUpdateConnectionKeyWhenConnectionIsResumed(Protocol protocol)
{
var client = await GetRealtimeClient(protocol);
await WaitForState(client, ConnectionState.Connected, TimeSpan.FromSeconds(10));
Expand Down Expand Up @@ -574,7 +641,7 @@ async Task CheckForProcessedPendingMessages()
client.State.Connection.Id = string.Empty;
client.State.Connection.Key = "xxxxx!xxxxxxx-xxxxxxxx-xxxxxxxx"; // invalid connection key for next resume request
client.GetTestTransport().Close(false);
await PublishMessagesWhileNotConnected(); // TODO - Fix process pending messages
await PublishMessagesWhileNotConnected();
await CheckForAttachedChannelsAfterResume();
await CheckForProcessedPendingMessages();
client.GetTestTransport().ProtocolMessagesReceived.Count(x => x.Action == ProtocolMessage.MessageAction.Connected).Should().Be(1); // For every new transport, list is reset
Expand Down

0 comments on commit 0e9918e

Please sign in to comment.