Skip to content

Commit

Permalink
Fix #414 (#416)
Browse files Browse the repository at this point in the history
* JsonRpcConnection: Pass shutdownCancellationSource.Token in a few more places.

Closes #414.

* JsonRpcConnection: Handle OperationCanceledException in all long-running tasks.

Just extends what's done in ReaderLoopAsync() to WriterLoopAsync() and
ProcessorLoopAsync() as well.
  • Loading branch information
alexrp authored Jul 17, 2024
1 parent 428071a commit daab801
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions src/Draco.JsonRpc/JsonRpcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private async Task ReaderLoopAsync()
}
else
{
await this.incomingMessages.Writer.WriteAsync(message!);
await this.incomingMessages.Writer.WriteAsync(message!, this.shutdownTokenSource.Token);
}
}
catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token)
Expand All @@ -132,9 +132,15 @@ private async Task ReaderLoopAsync()

private async Task WriterLoopAsync()
{
await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
try
{
await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
{
await this.WriteMessageAsync(message);
}
}
catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token)
{
await this.WriteMessageAsync(message);
}
}

Expand All @@ -147,24 +153,30 @@ bool IsMutating(TMessage message)
&& handler.Mutating;
}

var currentTasks = new List<Task>();
await foreach (var message in this.incomingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
try
{
if (IsMutating(message))
var currentTasks = new List<Task>();
await foreach (var message in this.incomingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
{
await Task.WhenAll(currentTasks);
currentTasks.Clear();
if (IsMutating(message))
{
await Task.WhenAll(currentTasks);
currentTasks.Clear();

await this.ProcessMessageAsync(message);
}
else
{
currentTasks.RemoveAll(t => t.IsCompleted);
currentTasks.Add(this.ProcessMessageAsync(message));
await this.ProcessMessageAsync(message);
}
else
{
currentTasks.RemoveAll(t => t.IsCompleted);
currentTasks.Add(this.ProcessMessageAsync(message));
}
}
}

await Task.WhenAll(currentTasks);
await Task.WhenAll(currentTasks);
}
catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token)
{
}
}
#endregion

Expand Down Expand Up @@ -359,7 +371,7 @@ public async Task SendNotificationAsync(string method, object? @params)
}

protected async Task SendMessageAsync(TMessage message) =>
await this.outgoingMessages.Writer.WriteAsync(message);
await this.outgoingMessages.Writer.WriteAsync(message, this.shutdownTokenSource.Token);

protected void SendMessage(TMessage message) =>
this.outgoingMessages.Writer.TryWrite(message);
Expand Down Expand Up @@ -538,7 +550,7 @@ void WriteData()
}

WriteData();
return writer.FlushAsync();
return writer.FlushAsync(this.shutdownTokenSource.Token);
}
#endregion

Expand Down

0 comments on commit daab801

Please sign in to comment.