Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event dispatch shouldn't fail when one of the clients disconnects #73

Merged
merged 3 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion CoreRemoting.Tests/RpcTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using CoreRemoting.Channels.Websocket;
using CoreRemoting.DependencyInjection;
using CoreRemoting.Serialization;
using CoreRemoting.Tests.ExternalTypes;
using CoreRemoting.Tests.Tools;
Expand Down Expand Up @@ -163,7 +166,7 @@ void ClientAction()
clientThread.Start();
clientThread.Join();

_serverFixture.Server.Config.MessageEncryption = true;
_serverFixture.Server.Config.MessageEncryption = false;

Assert.True(_remoteServiceCalled);
Assert.Equal(0, _serverFixture.ServerErrorCount);
Expand Down Expand Up @@ -509,5 +512,56 @@ public void NonSerializableError_method_throws_Exception()
_serverFixture.ServerErrorCount = 0;
}
}

[Fact]
public async Task Disposed_client_subscription_doesnt_break_other_clients()
{
async Task roundtrip(bool encryption)
{
var oldEncryption = _serverFixture.Server.Config.MessageEncryption;
_serverFixture.Server.Config.MessageEncryption = encryption;

try
{
RemotingClient createClient() => new RemotingClient(new ClientConfig()
{
ServerPort = _serverFixture.Server.Config.NetworkPort,
MessageEncryption = encryption,
});

using var client1 = createClient();
using var client2 = createClient();

client1.Connect();
client2.Connect();

var proxy1 = client1.CreateProxy<ITestService>();
var fired1 = new TaskCompletionSource<bool>();
proxy1.ServiceEvent += () => fired1.TrySetResult(true);

var proxy2 = client2.CreateProxy<ITestService>();
var fired2 = new TaskCompletionSource<bool>();
proxy2.ServiceEvent += () => fired2.TrySetResult(true);

// early disposal, proxy1 subscription isn't canceled
client1.Disconnect();

proxy2.FireServiceEvent();
Assert.True(await fired2.Task);
Assert.True(fired2.Task.IsCompleted);
Assert.False(fired1.Task.IsCompleted);
}
finally
{
_serverFixture.Server.Config.MessageEncryption = oldEncryption;
}
}

// works!
await roundtrip(encryption: false);

// fails!
await roundtrip(encryption: true);
}
}
}
4 changes: 4 additions & 0 deletions CoreRemoting.Tests/ServerFixture.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using CoreRemoting.DependencyInjection;
using CoreRemoting.Tests.Tools;
using Xunit;
Expand Down Expand Up @@ -65,7 +66,10 @@ public ServerFixture()
public void Dispose()
{
if (Server != null)
{
Thread.Sleep(100); // work around WatsonTcp 6.0.2 bug
Server.Dispose();
}
}
}

Expand Down
30 changes: 22 additions & 8 deletions CoreRemoting/RemotingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public sealed class RemotingSession : IDisposable
/// Event: Fired before the session is disposed to do some clean up.
/// </summary>
public event Action BeforeDispose;

#endregion

#region Construction
Expand Down Expand Up @@ -70,7 +70,7 @@ internal RemotingSession(int keySize, byte[] clientPublicKey, IRemotingServer se
_delegateProxyCache = new ConcurrentDictionary<Guid, IDelegateProxy>();
_rawMessageTransport = rawMessageTransport ?? throw new ArgumentNullException(nameof(rawMessageTransport));
_clientPublicKeyBlob = clientPublicKey;

_rawMessageTransport.ReceiveMessage += OnReceiveMessage;
_rawMessageTransport.ErrorOccured += OnErrorOccured;

Expand Down Expand Up @@ -122,6 +122,10 @@ internal RemotingSession(int keySize, byte[] clientPublicKey, IRemotingServer se
_remoteDelegateInvocationEventAggregator.RemoteDelegateInvocationNeeded +=
(_, uniqueCallKey, handlerKey, arguments) =>
{
// handle graceful client disconnection
if (_isDisposing)
return null;

var sharedSecret =
MessageEncryption
? _sessionId.ToByteArray()
Expand All @@ -134,7 +138,7 @@ internal RemotingSession(int keySize, byte[] clientPublicKey, IRemotingServer se
HandlerKey = handlerKey,
DelegateArguments = arguments
};

var remoteDelegateInvocationWebsocketMessage =
_server.MessageEncryptionManager
.CreateWireMessage(
Expand All @@ -144,9 +148,19 @@ internal RemotingSession(int keySize, byte[] clientPublicKey, IRemotingServer se
keyPair: _keyPair,
messageType: "invoke");

// Invoke remote delegate on client
_rawMessageTransport?.SendMessage(
_server.Serializer.Serialize(remoteDelegateInvocationWebsocketMessage));
try
{
// Invoke remote delegate on client
_rawMessageTransport?.SendMessage(
_server.Serializer.Serialize(remoteDelegateInvocationWebsocketMessage));
}
catch (Exception ex)
{
// handle unexpected client disconnection
OnErrorOccured("Failed to dispatch the remote event. " +
$"Session: {SessionId}, Unique call key: {uniqueCallKey}, " +
$"Handler key: {handlerKey}", ex);
}

return null;
};
Expand All @@ -162,7 +176,7 @@ internal RemotingSession(int keySize, byte[] clientPublicKey, IRemotingServer se
private void OnErrorOccured(string errorMessage, Exception ex)
{
var exception = new RemotingException(errorMessage, innerEx: ex);

((RemotingServer)_server).OnError(exception);
}

Expand Down Expand Up @@ -705,7 +719,7 @@ private bool MapLinqExpressionArgument(Type argumentType, object argument, out o

return true;
}

#endregion

#region Close session
Expand Down