Skip to content

Commit

Permalink
reintroduce ConnectionFactory as BeforeConnect
Browse files Browse the repository at this point in the history
- simplify service client configuration
- write tests
  • Loading branch information
eduard-dumitru committed Aug 16, 2024
1 parent 28c6d3c commit 28eecd0
Show file tree
Hide file tree
Showing 29 changed files with 437 additions and 163 deletions.
1 change: 1 addition & 0 deletions NuGet.Config
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
<configuration>
<packageSources>
<add key="Nuget" value="https://api.nuget.org/v3/index.json" />
<add key="UiPath-Internal" value="https://uipath.pkgs.visualstudio.com/Public.Feeds/_packaging/UiPath-Internal/nuget/v3/index.json" />
</packageSources>
</configuration>
11 changes: 0 additions & 11 deletions benchmarks/NuGet.Config

This file was deleted.

2 changes: 1 addition & 1 deletion src/CoreIpc.sln
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{676A208A-2F08-4749-A833-F8D2BCB1B147}"
ProjectSection(SolutionItems) = preProject
Directory.Build.targets = Directory.Build.targets
NuGet.Config = NuGet.Config
..\NuGet.Config = ..\NuGet.Config
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Playground", "Playground\Playground.csproj", "{F0365E40-DA73-4583-A363-89CBEF68A4C6}"
Expand Down
74 changes: 33 additions & 41 deletions src/UiPath.CoreIpc/Client/ServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,8 @@

internal abstract class ServiceClient : IDisposable
{
#region " NonGeneric-Generic adapter cache "
private static readonly MethodInfo GenericDefinition = ((Func<ServiceClient, MethodInfo, object?[], Task<int>>)Invoke<int>).Method.GetGenericMethodDefinition();
private static readonly ConcurrentDictionary<Type, InvokeDelegate> ReturnTypeToInvokeDelegate = new();
private static InvokeDelegate GetInvokeDelegate(Type returnType) => ReturnTypeToInvokeDelegate.GetOrAdd(returnType, CreateInvokeDelegate);
private static InvokeDelegate CreateInvokeDelegate(Type returnType)
=> GenericDefinition.MakeGenericDelegate<InvokeDelegate>(
returnType.IsGenericType
? returnType.GetGenericArguments()[0]
: typeof(object));

private static Task<TResult> Invoke<TResult>(ServiceClient serviceClient, MethodInfo method, object?[] args) => serviceClient.Invoke<TResult>(method, args);
#endregion

protected abstract TimeSpan RequestTimeout { get; }
protected abstract BeforeCallHandler? BeforeCall { get; }
protected abstract ILogger? Log { get; }
protected abstract string DebugName { get; }
protected abstract ISerializer? Serializer { get; }
protected abstract IServiceClientConfig Config { get; }
public abstract Stream? Network { get; }

public event EventHandler? ConnectionClosed;

private readonly Type _interfaceType;
Expand All @@ -33,7 +15,6 @@ protected ServiceClient(Type interfaceType)
}

protected void RaiseConnectionClosed() => ConnectionClosed?.Invoke(this, EventArgs.Empty);

public virtual ValueTask CloseConnection() => throw new NotSupportedException();
public object? Invoke(MethodInfo method, object?[] args) => GetInvokeDelegate(method.ReturnType)(this, method, args);

Expand Down Expand Up @@ -64,7 +45,7 @@ async Task<TResult> Invoke()
{
CancellationToken cancellationToken = default;
TimeSpan messageTimeout = default;
TimeSpan clientTimeout = RequestTimeout;
TimeSpan clientTimeout = Config.RequestTimeout;
Stream? uploadStream = null;
var methodName = method.Name;

Expand All @@ -77,10 +58,10 @@ async Task<TResult> Invoke()

var (connection, newConnection) = await EnsureConnection(ct);

if (BeforeCall is not null)
if (Config.BeforeCall is not null)
{
var callInfo = new CallInfo(newConnection, method, args);
await BeforeCall(callInfo, ct);
await Config.BeforeCall(callInfo, ct);
}

var requestId = connection.NewRequestId();
Expand All @@ -89,11 +70,11 @@ async Task<TResult> Invoke()
UploadStream = uploadStream
};

Log?.ServiceClientCalling(methodName, requestId, DebugName);
Config.Logger?.ServiceClientCalling(methodName, requestId, Config.DebugName);
var response = await connection.RemoteCall(request, ct); // returns user errors instead of throwing them (could throw for system bugs)
Log?.ServiceClientCalled(methodName, requestId, DebugName);
Config.Logger?.ServiceClientCalled(methodName, requestId, Config.DebugName);

return response.Deserialize<TResult>(Serializer);
return response.Deserialize<TResult>(Config.Serializer);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -127,7 +108,7 @@ string[] SerializeArguments()
break;
}

result[index] = Serializer.OrDefault().Serialize(args[index]);
result[index] = Config.Serializer.OrDefault().Serialize(args[index]);
}

return result;
Expand All @@ -142,9 +123,22 @@ public void Dispose()
}
private void Dispose(bool disposing)
{
Log?.ServiceClientDispose(DebugName);
Config.Logger?.ServiceClientDispose(Config.DebugName);
}
public override string ToString() => DebugName;
public override string ToString() => Config.DebugName;

#region Generic adapter cache
private static readonly MethodInfo GenericDefinition = ((Func<ServiceClient, MethodInfo, object?[], Task<int>>)Invoke<int>).Method.GetGenericMethodDefinition();
private static readonly ConcurrentDictionary<Type, InvokeDelegate> ReturnTypeToInvokeDelegate = new();
private static InvokeDelegate GetInvokeDelegate(Type returnType) => ReturnTypeToInvokeDelegate.GetOrAdd(returnType, CreateInvokeDelegate);
private static InvokeDelegate CreateInvokeDelegate(Type returnType)
=> GenericDefinition.MakeGenericDelegate<InvokeDelegate>(
returnType.IsGenericType
? returnType.GetGenericArguments()[0]
: typeof(object));

private static Task<TResult> Invoke<TResult>(ServiceClient serviceClient, MethodInfo method, object?[] args) => serviceClient.Invoke<TResult>(method, args);
#endregion
}

internal sealed class ServiceClientProper : ServiceClient
Expand Down Expand Up @@ -208,10 +202,16 @@ public override async ValueTask CloseConnection()
return (LatestConnection, newlyConnected: false);
}

LatestConnection = new Connection(await Connect(ct), Serializer, Log, DebugName);
if (Config.BeforeConnect is not null)
{
await Config.BeforeConnect(ct);
}

var network = await Connect(ct);
LatestConnection = new Connection(network, Config.Serializer, Config.Logger, Config.DebugName);
var router = new Router(_client.Config.CreateCallbackRouterConfig(), _client.Config.ServiceProvider);
_latestServer = new Server(router, _client.Config.RequestTimeout, LatestConnection);
LatestConnection.Listen().LogException(Log, DebugName);
LatestConnection.Listen().LogException(Config.Logger, Config.DebugName);
return (LatestConnection, newlyConnected: true);
}
}
Expand All @@ -228,11 +228,7 @@ private async Task<Network> Connect(CancellationToken ct)
return network;
}

protected override TimeSpan RequestTimeout => _client.Config.RequestTimeout;
protected override BeforeCallHandler? BeforeCall => _client.Config.BeforeCall;
protected override ILogger? Log => _client.Config.Logger;
protected override string DebugName => _client.Transport.ToString();
protected override ISerializer? Serializer => _client.Config.Serializer;
protected override IServiceClientConfig Config => _client.Config;
}

internal sealed class ServiceClientForCallback : ServiceClient
Expand All @@ -251,9 +247,5 @@ public ServiceClientForCallback(Connection connection, Listener listener, Type i
protected override Task<(Connection connection, bool newlyConnected)> EnsureConnection(CancellationToken ct)
=> Task.FromResult((_connection, newlyConnected: false));

protected override TimeSpan RequestTimeout => _listener.Config.RequestTimeout;
protected override BeforeCallHandler? BeforeCall => null;
protected override ILogger? Log => null;
protected override string DebugName => $"ReverseClient for {_listener}";
protected override ISerializer? Serializer => null;
protected override IServiceClientConfig Config => _listener.Config;
}
10 changes: 8 additions & 2 deletions src/UiPath.CoreIpc/Config/ClientConfig.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
namespace UiPath.Ipc;
using System.ComponentModel;

public sealed record ClientConfig : EndpointConfig
namespace UiPath.Ipc;

public sealed record ClientConfig : EndpointConfig, IServiceClientConfig
{
public EndpointCollection? Callbacks { get; init; }

public IServiceProvider? ServiceProvider { get; init; }
public ILogger? Logger { get; init; }
public BeforeConnectHandler? BeforeConnect { get; init; }
public BeforeCallHandler? BeforeCall { get; init; }
public TaskScheduler? Scheduler { get; init; }
public ISerializer? Serializer { get; set; }

[EditorBrowsable(EditorBrowsableState.Never)]
public string DebugName { get; set; } = null!;

internal void Validate()
{
var haveDeferredInjectedCallbacks = Callbacks?.Any(x => x.Service.MaybeGetServiceProvider() is null && x.Service.MaybeGetInstance() is null) ?? false;
Expand Down
11 changes: 11 additions & 0 deletions src/UiPath.CoreIpc/Config/IServiceClientConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace UiPath.Ipc;

internal interface IServiceClientConfig
{
TimeSpan RequestTimeout { get; }
BeforeConnectHandler? BeforeConnect { get; }
BeforeCallHandler? BeforeCall { get; }
ILogger? Logger { get; }
ISerializer? Serializer { get; }
string DebugName { get; }
}
2 changes: 2 additions & 0 deletions src/UiPath.CoreIpc/Config/IpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ internal void Validate()

Config.Validate();
Transport.Validate();

Config.DebugName ??= Transport.ToString();
}
}
14 changes: 11 additions & 3 deletions src/UiPath.CoreIpc/Config/ListenerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

namespace UiPath.Ipc;

public abstract record ListenerConfig : EndpointConfig
public abstract record ListenerConfig : EndpointConfig, IServiceClientConfig
{
public int ConcurrentAccepts { get; init; } = 5;
public byte MaxReceivedMessageSizeInMegabytes { get; init; } = 2;
public X509Certificate? Certificate { get; init; }

internal int MaxMessageSize => MaxReceivedMessageSizeInMegabytes * 1024 * 1024;

internal string DebugName => GetType().Name;
internal IEnumerable<string> Validate() => Enumerable.Empty<string>();

internal override RouterConfig CreateRouterConfig(IpcServer server)
Expand All @@ -20,4 +18,14 @@ internal override RouterConfig CreateRouterConfig(IpcServer server)
{
Scheduler = endpoint.Scheduler ?? server.Scheduler
});

#region IServiceClientConfig
/// Do not implement <see cref="IServiceClientConfig.RequestTimeout"/> explicitly, as it must be implicitly implemented by <see cref="EndpointConfig.RequestTimeout"/>.

BeforeConnectHandler? IServiceClientConfig.BeforeConnect => null;
BeforeCallHandler? IServiceClientConfig.BeforeCall => null;
ILogger? IServiceClientConfig.Logger => null;
ISerializer? IServiceClientConfig.Serializer => null!;
string IServiceClientConfig.DebugName => $"CallbackClient for {this}";
#endregion
}
1 change: 1 addition & 0 deletions src/UiPath.CoreIpc/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
global using UiPath.Ipc.Extensibility;
global using BeforeConnectHandler = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>;
global using BeforeCallHandler = System.Func<UiPath.Ipc.CallInfo, System.Threading.CancellationToken, System.Threading.Tasks.Task>;
global using InvokeDelegate = System.Func<UiPath.Ipc.ServiceClient, System.Reflection.MethodInfo, object?[], object?>;
global using Accept = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task<System.Net.WebSockets.WebSocket>>;
Expand Down
16 changes: 7 additions & 9 deletions src/UiPath.CoreIpc/Server/Listener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected Listener(IpcServer server, ListenerConfig config)
{
Config = config;
Server = server;
Logger = server.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(config.DebugName);
Logger = server.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(categoryName: config.ToString());
_disposeTask = new(DisposeCore);
}

Expand Down Expand Up @@ -131,27 +131,27 @@ public void LogError(Exception exception, string message)

protected override async Task DisposeCore()
{
Log($"Stopping listener {Config.DebugName}...");
Log($"Stopping listener {Config}...");
_cts.Cancel();
try
{
await _listeningTask;
}
catch (OperationCanceledException ex) when (ex.CancellationToken == _cts.Token)
{
Log($"Stopping listener {Config.DebugName} threw OCE.");
Log($"Stopping listener {Config} threw OCE.");
}
catch (Exception ex)
{
LogError(ex, $"Stopping listener {Config.DebugName} failed.");
LogError(ex, $"Stopping listener {Config} failed.");
}
await State.DisposeAsync();
_cts.Dispose();
}

private async Task Listen(CancellationToken ct)
{
Log($"Starting listener {Config.DebugName}...");
Log($"Starting listener {Config}...");

await Task.WhenAll(Enumerable.Range(1, Config.ConcurrentAccepts).Select(async _ =>
{
Expand All @@ -167,17 +167,15 @@ private async Task AcceptConnection(CancellationToken ct)
try
{
var network = await serverConnection.AcceptClient(ct);
serverConnection.Listen(network, ct).LogException(Logger, Config.DebugName);
serverConnection.Listen(network, ct).LogException(Logger, Config);
}
catch (Exception ex)
{
serverConnection.Dispose();
if (!ct.IsCancellationRequested)
{
Logger.LogException(ex, Config.DebugName);
Logger.LogException(ex, Config);
}
}
}

public override string ToString() => Config.ToString();
}
2 changes: 1 addition & 1 deletion src/UiPath.CoreIpc/Server/ServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ TCallbackInterface IClient.GetCallback<TCallbackInterface>() where TCallbackInte

TCallbackInterface CreateCallback(Type callbackContract)
{
Listener.Logger.LogInformation($"Create callback {callbackContract} {Listener.Config.DebugName}");
Listener.Logger.LogInformation($"Create callback {callbackContract} {Listener.Config}");

_connectionAsTask ??= Task.FromResult(Connection!);

Expand Down
Loading

0 comments on commit 28eecd0

Please sign in to comment.