From 29255a8e861d51867a458da28feac94b6f603e9c Mon Sep 17 00:00:00 2001 From: LPeter1997 Date: Sat, 14 Oct 2023 14:05:31 +0200 Subject: [PATCH] Update JsonRpcConnection.cs --- src/Draco.JsonRpc/JsonRpcConnection.cs | 47 ++++++++++++++++++-------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/Draco.JsonRpc/JsonRpcConnection.cs b/src/Draco.JsonRpc/JsonRpcConnection.cs index 21a668a347..d2c4afd314 100644 --- a/src/Draco.JsonRpc/JsonRpcConnection.cs +++ b/src/Draco.JsonRpc/JsonRpcConnection.cs @@ -60,18 +60,6 @@ private sealed class OutgoingRequest : IOutgoingRequest // Communication state private int lastMessageId = 0; - // IO channels - private readonly Channel incomingMessages = Channel.CreateUnbounded(new() - { - SingleReader = true, - SingleWriter = true, - }); - private readonly Channel outgoingMessages = Channel.CreateUnbounded(new() - { - SingleReader = true, - SingleWriter = false, - }); - // Pending requests private readonly ConcurrentDictionary pendingIncomingRequests = new(); private readonly ConcurrentDictionary pendingOutgoingRequests = new(); @@ -79,12 +67,41 @@ private sealed class OutgoingRequest : IOutgoingRequest // Handlers private readonly Dictionary methodHandlers = new(); + // Shutdown + private readonly CancellationTokenSource shutdownTokenSource = new(); + // TODO: Doc public void AddHandler(IJsonRpcMethodHandler handler) => this.methodHandlers.Add(handler.Method, handler); - // TODO: Start listening - // TODO: IO loop - // TODO: Shutdown + // TODO: Doc + public Task StartListening() => Task.Run(async () => + { + while (!this.shutdownTokenSource.IsCancellationRequested) + { + var message = await this.ReadMessageAsync(); + if (message is null) break; + + if (message.IsRequest) + { + var response = await this.ProcessIncomingRequestAsync(message); + this.outgoingMessages.Writer.TryWrite(response); + } + else if (message.IsResponse) + { + this.ProcessIncomingResponse(message); + } + else if (message.IsNotification) + { + await this.ProcessIncomingNotificationAsync(message); + } + else + { + // TODO: What to do? + } + } + }); + + public void Shutdown() => this.shutdownTokenSource.Cancel(); protected int NextMessageId() => Interlocked.Increment(ref this.lastMessageId);