Skip to content

Commit

Permalink
Shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
LPeter1997 committed Oct 19, 2023
1 parent f727242 commit 9e5731d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/Draco.JsonRpc/IJsonRpcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public interface IJsonRpcConnection
/// <returns>The task that completes when the connection closes.</returns>
public Task ListenAsync();

/// <summary>
/// Shuts down this connection.
/// </summary>
public void Shutdown();

// TODO: Doc
public Task<TResponse?> SendRequestAsync<TResponse>(string method, object? @params);

Expand Down
37 changes: 29 additions & 8 deletions src/Draco.JsonRpc/JsonRpcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text.Json;
using System.Threading.Channels;
using System.Collections.Concurrent;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Draco.JsonRpc;

Expand Down Expand Up @@ -85,6 +86,9 @@ private sealed class OutgoingRequest<TResponse> : IOutgoingRequest
private readonly ConcurrentDictionary<object, CancellationTokenSource> pendingIncomingRequests = new();
private readonly ConcurrentDictionary<int, IOutgoingRequest> pendingOutgoingRequests = new();

// Shutdown
private readonly CancellationTokenSource shutdownTokenSource = new();

// Communication state
private int lastMessageId = 0;

Expand All @@ -95,30 +99,47 @@ public Task ListenAsync() => Task.WhenAll(
this.WriterLoopAsync(),
this.ProcessorLoopAsync());

public void Shutdown() => this.shutdownTokenSource.Cancel();

protected int NextMessageId() => Interlocked.Increment(ref this.lastMessageId);

#region Message Loops
private async Task ReaderLoopAsync()
{
while (true)
{
var (message, foundMessage) = await this.ReadMessageAsync();
if (!foundMessage) break;
try
{
var (message, foundMessage) = await this
.ReadMessageAsync()
.WaitAsync(this.shutdownTokenSource.Token);
if (!foundMessage) break;

if (TMessageAdapter.IsResponse(message!))
if (TMessageAdapter.IsResponse(message!))
{
this.ProcessIncomingResponse(message!);
}
else
{
await this.incomingMessages.Writer.WriteAsync(message!);
}
}
catch (OperationCanceledException oce) when (oce.CancellationToken == this.shutdownTokenSource.Token)
{
this.ProcessIncomingResponse(message!);
break;
}
else
catch (JsonException ex)
{
await this.incomingMessages.Writer.WriteAsync(message!);
var error = TMessageAdapter.CreateJsonExceptionError(ex);
await this.outgoingMessages.Writer.WriteAsync(TMessageAdapter.CreateErrorResponse(null!, error));
continue;
}
}
}

private async Task WriterLoopAsync()
{
await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync())
await foreach (var message in this.outgoingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
{
await this.WriteMessageAsync(message);
}
Expand All @@ -134,7 +155,7 @@ bool IsMutating(TMessage message)
}

var currentTasks = new List<Task>();
await foreach (var message in this.incomingMessages.Reader.ReadAllAsync())
await foreach (var message in this.incomingMessages.Reader.ReadAllAsync(this.shutdownTokenSource.Token))
{
if (IsMutating(message))
{
Expand Down
3 changes: 1 addition & 2 deletions src/Draco.Lsp/Server/LanguageServerLifecycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public async Task InitializedAsync(InitializedParams param)

public Task ExitAsync()
{
// TODO
// this.connection.Shutdown();
this.connection.Shutdown();
return Task.CompletedTask;
}

Expand Down

0 comments on commit 9e5731d

Please sign in to comment.