Skip to content

Commit

Permalink
Update JsonRpcConnection.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
LPeter1997 committed Oct 14, 2023
1 parent 1332913 commit 29255a8
Showing 1 changed file with 32 additions and 15 deletions.
47 changes: 32 additions & 15 deletions src/Draco.JsonRpc/JsonRpcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,31 +60,48 @@ private sealed class OutgoingRequest<TResponse> : IOutgoingRequest
// Communication state
private int lastMessageId = 0;

// IO channels
private readonly Channel<TMessage> incomingMessages = Channel.CreateUnbounded<TMessage>(new()
{
SingleReader = true,
SingleWriter = true,
});
private readonly Channel<TMessage> outgoingMessages = Channel.CreateUnbounded<TMessage>(new()
{
SingleReader = true,
SingleWriter = false,
});

// Pending requests
private readonly ConcurrentDictionary<object, CancellationTokenSource> pendingIncomingRequests = new();
private readonly ConcurrentDictionary<int, IOutgoingRequest> pendingOutgoingRequests = new();

// Handlers
private readonly Dictionary<string, IJsonRpcMethodHandler> methodHandlers = new();

// Shutdown
private readonly CancellationTokenSource shutdownTokenSource = new();

// TODO: Doc
public void AddHandler(IJsonRpcMethodHandler handler) => this.methodHandlers.Add(handler.Method, handler);

// TODO: Start listening
// TODO: IO loop
// TODO: Shutdown
// TODO: Doc
public Task StartListening() => Task.Run(async () =>
{
while (!this.shutdownTokenSource.IsCancellationRequested)
{
var message = await this.ReadMessageAsync();
if (message is null) break;

if (message.IsRequest)
{
var response = await this.ProcessIncomingRequestAsync(message);
this.outgoingMessages.Writer.TryWrite(response);

Check failure on line 87 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-ubuntu-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 87 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-ubuntu-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 87 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-macOS-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 87 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-macOS-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)
}
else if (message.IsResponse)
{
this.ProcessIncomingResponse(message);
}
else if (message.IsNotification)
{
await this.ProcessIncomingNotificationAsync(message);
}
else
{
// TODO: What to do?
}
}
});

public void Shutdown() => this.shutdownTokenSource.Cancel();

protected int NextMessageId() => Interlocked.Increment(ref this.lastMessageId);

Expand Down

0 comments on commit 29255a8

Please sign in to comment.