From daab8012c4c3fdd74ad427d020e7fee554e202cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20R=C3=B8nne=20Petersen?= Date: Wed, 17 Jul 2024 17:07:39 +0200 Subject: [PATCH] Fix #414 (#416) * JsonRpcConnection: Pass shutdownCancellationSource.Token in a few more places. Closes #414. * JsonRpcConnection: Handle OperationCanceledException in all long-running tasks. Just extends what's done in ReaderLoopAsync() to WriterLoopAsync() and ProcessorLoopAsync() as well. --- src/Draco.JsonRpc/JsonRpcConnection.cs | 48 ++++++++++++++++---------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/src/Draco.JsonRpc/JsonRpcConnection.cs b/src/Draco.JsonRpc/JsonRpcConnection.cs index da0f20dbb..f355a0412 100644 --- a/src/Draco.JsonRpc/JsonRpcConnection.cs +++ b/src/Draco.JsonRpc/JsonRpcConnection.cs @@ -114,7 +114,7 @@ private async Task ReaderLoopAsync() } else { - await this.incomingMessages.Writer.WriteAsync(message!); + await this.incomingMessages.Writer.WriteAsync(message!, this.shutdownTokenSource.Token); } } catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token) @@ -132,9 +132,15 @@ private async Task ReaderLoopAsync() private async Task WriterLoopAsync() { - await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token)) + try + { + await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token)) + { + await this.WriteMessageAsync(message); + } + } + catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token) { - await this.WriteMessageAsync(message); } } @@ -147,24 +153,30 @@ bool IsMutating(TMessage message) && handler.Mutating; } - var currentTasks = new List(); - await foreach (var message in this.incomingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token)) + try { - if (IsMutating(message)) + var currentTasks = new List(); + await foreach (var message in this.incomingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token)) { - await Task.WhenAll(currentTasks); - currentTasks.Clear(); + if (IsMutating(message)) + { + await Task.WhenAll(currentTasks); + currentTasks.Clear(); - await this.ProcessMessageAsync(message); - } - else - { - currentTasks.RemoveAll(t => t.IsCompleted); - currentTasks.Add(this.ProcessMessageAsync(message)); + await this.ProcessMessageAsync(message); + } + else + { + currentTasks.RemoveAll(t => t.IsCompleted); + currentTasks.Add(this.ProcessMessageAsync(message)); + } } - } - await Task.WhenAll(currentTasks); + await Task.WhenAll(currentTasks); + } + catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token) + { + } } #endregion @@ -359,7 +371,7 @@ public async Task SendNotificationAsync(string method, object? @params) } protected async Task SendMessageAsync(TMessage message) => - await this.outgoingMessages.Writer.WriteAsync(message); + await this.outgoingMessages.Writer.WriteAsync(message, this.shutdownTokenSource.Token); protected void SendMessage(TMessage message) => this.outgoingMessages.Writer.TryWrite(message); @@ -538,7 +550,7 @@ void WriteData() } WriteData(); - return writer.FlushAsync(); + return writer.FlushAsync(this.shutdownTokenSource.Token); } #endregion