From d5d309a997a86de227b8aec9bf06b6e891ece59e Mon Sep 17 00:00:00 2001 From: LPeter1997 Date: Fri, 29 Sep 2023 21:51:01 +0200 Subject: [PATCH] Factored out a new message scheduler --- src/Draco.JsonRpc/Draco.JsonRpc.csproj | 12 +++ src/Draco.JsonRpc/MessageScheduler.cs | 100 +++++++++++++++++++++++++ src/Draco.sln | 10 ++- 3 files changed, 121 insertions(+), 1 deletion(-) create mode 100644 src/Draco.JsonRpc/Draco.JsonRpc.csproj create mode 100644 src/Draco.JsonRpc/MessageScheduler.cs diff --git a/src/Draco.JsonRpc/Draco.JsonRpc.csproj b/src/Draco.JsonRpc/Draco.JsonRpc.csproj new file mode 100644 index 0000000000..bba698281d --- /dev/null +++ b/src/Draco.JsonRpc/Draco.JsonRpc.csproj @@ -0,0 +1,12 @@ + + + + net7.0 + enable + + + + + + + diff --git a/src/Draco.JsonRpc/MessageScheduler.cs b/src/Draco.JsonRpc/MessageScheduler.cs new file mode 100644 index 0000000000..1a7de2e105 --- /dev/null +++ b/src/Draco.JsonRpc/MessageScheduler.cs @@ -0,0 +1,100 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Draco.JsonRpc; + +/// +/// Implement a message scheduler that reads from a single thread of source, then +/// dispatches the work based on concurrent/exclusive categorization. +/// +/// Concurrent messages can be processed in parallel, while exclusive ones can only be +/// processed sequentially. +/// +/// The message type. +public sealed class MessageScheduler +{ + private enum ConsumerState + { + None, + Concurrent, + Exclusive, + } + + private readonly record struct MessageEntry(TMessage Message, bool IsExclusive); + + private readonly Channel messages = Channel.CreateUnbounded(new() + { + SingleWriter = true, + SingleReader = true, + }); + private readonly Action processMessage; + + public MessageScheduler(Action processMessage) + { + this.processMessage = processMessage; + } + + /// + /// Completes this scheduler, stopping the reader/writer. + /// + public void Complete() => this.messages.Writer.Complete(); + + /// + /// Enqueues the given message for scheduling. + /// + /// The message to schedule. + /// True, if the message should be exclusively scheduled. + /// The task that completes, when the message is enqueued. + public ValueTask Enqueue(TMessage message, bool isExclusive) + { + var entry = new MessageEntry(message, isExclusive); + return this.messages.Writer.WriteAsync(entry); + } + + /// + /// Runs the scheduler's reader-writer loop. + /// + /// Can be used to exit the loop. + /// The task that completes, when the loop stops. + public Task Run(CancellationToken cancellationToken) => Task.Run(async () => + { + var state = ConsumerState.None; + var pendingProcesses = new List(); + while (!cancellationToken.IsCancellationRequested) + { + await foreach (var message in this.messages.Reader.ReadAllAsync()) + { + if (message.IsExclusive) + { + if (state == ConsumerState.Concurrent) + { + // We are in concurrent state, wait for messages to complete + await Task.WhenAll(pendingProcesses); + pendingProcesses.Clear(); + } + // We are now in exclusive state + state = ConsumerState.Exclusive; + // Process synchronously + this.processMessage(message.Message); + } + else + { + // If we get here, we are guaranteed to be done with exclusive message processing + // we can start concurrent processing + + // Add to the pool + pendingProcesses.Add(Task.Run(() => this.processMessage(message.Message))); + state = ConsumerState.Concurrent; + } + } + } + + // Process any remaining concurrent message + await Task.WhenAll(pendingProcesses); + }, cancellationToken); +} diff --git a/src/Draco.sln b/src/Draco.sln index e5ccd44e65..905b0f4d58 100644 --- a/src/Draco.sln +++ b/src/Draco.sln @@ -42,7 +42,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Draco.Dap", "Draco.Dap\Drac EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Draco.DebugAdapter", "Draco.DebugAdapter\Draco.DebugAdapter.csproj", "{9CEC6FB5-720F-42D7-BB74-A3E11A750522}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Draco.Compiler.Benchmarks", "Draco.Compiler.Benchmarks\Draco.Compiler.Benchmarks.csproj", "{067C2F6D-8C70-4BA6-83BE-F7B351D09188}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Draco.Compiler.Benchmarks", "Draco.Compiler.Benchmarks\Draco.Compiler.Benchmarks.csproj", "{067C2F6D-8C70-4BA6-83BE-F7B351D09188}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Draco.JsonRpc", "Draco.JsonRpc\Draco.JsonRpc.csproj", "{A92E1020-A5D7-49F1-8999-FF2178E21F36}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -153,6 +155,12 @@ Global {067C2F6D-8C70-4BA6-83BE-F7B351D09188}.Nuget|Any CPU.Build.0 = Debug|Any CPU {067C2F6D-8C70-4BA6-83BE-F7B351D09188}.Release|Any CPU.ActiveCfg = Release|Any CPU {067C2F6D-8C70-4BA6-83BE-F7B351D09188}.Release|Any CPU.Build.0 = Release|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Nuget|Any CPU.ActiveCfg = Debug|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Nuget|Any CPU.Build.0 = Debug|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A92E1020-A5D7-49F1-8999-FF2178E21F36}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE