Skip to content

Commit

Permalink
Update JsonRpcConnection.cs
Browse files Browse the repository at this point in the history
  • Loading branch information
LPeter1997 committed Oct 14, 2023
1 parent 29255a8 commit 80a27f4
Showing 1 changed file with 55 additions and 12 deletions.
67 changes: 55 additions & 12 deletions src/Draco.JsonRpc/JsonRpcConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ private sealed class OutgoingRequest<TResponse> : IOutgoingRequest
public void Complete(object? result) => this.tcs.SetResult((TResponse?)result);
}

private enum ConsumerState
{
None,
Concurrent,
Exclusive,
}

/// <summary>
/// The IO transport pipeline to send and receive messages through.
/// </summary>
Expand Down Expand Up @@ -76,36 +83,72 @@ private sealed class OutgoingRequest<TResponse> : IOutgoingRequest
// TODO: Doc
public Task StartListening() => Task.Run(async () =>
{
var state = ConsumerState.None;
var pendingProcesses = new List<Task>();

while (!this.shutdownTokenSource.IsCancellationRequested)
{
var message = await this.ReadMessageAsync();
if (message is null) break;

if (message.IsRequest)
{
var response = await this.ProcessIncomingRequestAsync(message);
this.outgoingMessages.Writer.TryWrite(response);
}
else if (message.IsResponse)
{
this.ProcessIncomingResponse(message);
}
else if (message.IsNotification)
var isExclusive = this.methodHandlers.TryGetValue(message.Method, out var handler)
? handler.Mutating
: false;

if (isExclusive)
{
await this.ProcessIncomingNotificationAsync(message);
if (state == ConsumerState.Concurrent)
{
// We are in concurrent state, wait for messages to complete
await Task.WhenAll(pendingProcesses);
pendingProcesses.Clear();
}
// We are now in exclusive state
state = ConsumerState.Exclusive;
// Process synchronously
await this.ProcessIncomingMessage(message);
}
else
{
// TODO: What to do?
// If we get here, we are guaranteed to be done with exclusive message processing
// we can start concurrent processing

// Add to the pool
pendingProcesses.Add(Task.Run(() => this.ProcessIncomingMessage(message)));
state = ConsumerState.Concurrent;
}
}

// Process any remaining concurrent message
await Task.WhenAll(pendingProcesses);
});

public void Shutdown() => this.shutdownTokenSource.Cancel();

protected int NextMessageId() => Interlocked.Increment(ref this.lastMessageId);

#region Message Processing
private async Task ProcessIncomingMessage(TMessage message)
{
if (message.IsRequest)
{
var response = await this.ProcessIncomingRequestAsync(message);
this.outgoingMessages.Writer.TryWrite(response);

Check failure on line 136 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-ubuntu-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 136 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-ubuntu-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 136 in src/Draco.JsonRpc/JsonRpcConnection.cs

View workflow job for this annotation

GitHub Actions / tests-macOS-latest

'JsonRpcConnection<TMessage>' does not contain a definition for 'outgoingMessages' and no accessible extension method 'outgoingMessages' accepting a first argument of type 'JsonRpcConnection<TMessage>' could be found (are you missing a using directive or an assembly reference?)
}
else if (message.IsResponse)
{
this.ProcessIncomingResponse(message);
}
else if (message.IsNotification)
{
await this.ProcessIncomingNotificationAsync(message);
}
else
{
// TODO: What to do?
}
}

private async Task<TMessage> ProcessIncomingRequestAsync(TMessage message)
{
TMessage Error(object error)
Expand Down

0 comments on commit 80a27f4

Please sign in to comment.