Skip to content

Commit

Permalink
simplify Ipc client api
Browse files Browse the repository at this point in the history
  • Loading branch information
eduard-dumitru committed Aug 15, 2024
1 parent b5ce48c commit 28c6d3c
Show file tree
Hide file tree
Showing 27 changed files with 390 additions and 262 deletions.
36 changes: 24 additions & 12 deletions src/Clients/js/dotnet/UiPath.CoreIpc.NodeInterop/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,42 @@ IEnumerable<Task> EnumeratePings()
{
if (webSocketUrl is not null)
{
yield return new WebSocketClient()
yield return new IpcClient
{
Uri = new(webSocketUrl),
ServiceProvider = sp,
RequestTimeout = TimeSpan.FromHours(5),
Callbacks = new()
Config = new()
{
{ typeof(IArithmetic), callback }
ServiceProvider = sp,
RequestTimeout = TimeSpan.FromHours(5),
Callbacks = new()
{
{ typeof(IArithmetic), callback }
},
},
Transport = new WebSocketTransport
{
Uri = new(webSocketUrl),
}
}
.GetProxy<IAlgebra>()
.Ping();
}

if (pipeName is not null)
{
yield return new NamedPipeClient()
yield return new IpcClient
{
PipeName = pipeName,
ServiceProvider = sp,
RequestTimeout = TimeSpan.FromHours(5),
Callbacks = new()
Config = new()
{
ServiceProvider = sp,
RequestTimeout = TimeSpan.FromHours(5),
Callbacks = new()
{
{ typeof(IArithmetic), callback }
}
},
Transport = new NamedPipeTransport()
{
{ typeof(IArithmetic), callback }
PipeName = pipeName,
}
}
.GetProxy<IAlgebra>()
Expand Down
76 changes: 64 additions & 12 deletions src/Playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ private static async Task Main(string[] args)
ServiceProvider = serverSP,
Endpoints = new()
{
typeof(Contracts.IServerOperations),
typeof(Contracts.IServerOperations), // DEVINE
new EndpointSettings(typeof(Contracts.IServerOperations)) // ASTALALT
{
BeforeCall = async (callInfo, _) =>
{
Console.WriteLine($"Server: {callInfo.Method.Name}");
}
},
typeof(Contracts.IClientOperations2)
},
Listeners = [
Expand Down Expand Up @@ -73,21 +80,66 @@ private static async Task Main(string[] args)
throw;
}

var proxy1 = new NamedPipeClient()
var c1 = new IpcClient()
{
PipeName = Contracts.PipeName,
ServerName = ".",
AllowImpersonation = false,
Config = new()
{
Callbacks = new()
{
typeof(Contracts.IClientOperations),
{ typeof(Contracts.IClientOperations2), new Impl.Client2() },
},
ServiceProvider = clientSP,
Scheduler = clientScheduler,
},
Transport = new NamedPipeTransport()
{
PipeName = Contracts.PipeName,
ServerName = ".",
AllowImpersonation = false,
},
};

var c2 = new IpcClient()
{
Config = new()
{
ServiceProvider = clientSP,
Callbacks = new()
{
typeof(Contracts.IClientOperations),
{ typeof(Contracts.IClientOperations2), new Impl.Client2() },
},
Scheduler = clientScheduler,
},
Transport = new NamedPipeTransport()
{
PipeName = Contracts.PipeName,
ServerName = ".",
AllowImpersonation = false,
},
};

ServiceProvider = clientSP,
Callbacks = new()
var proxy1 = new IpcClient()
{
Config = new()
{
typeof(Contracts.IClientOperations),
{ typeof(Contracts.IClientOperations2), new Impl.Client2() }
ServiceProvider = clientSP,
Callbacks = new()
{
typeof(Contracts.IClientOperations),
{ typeof(Contracts.IClientOperations2), new Impl.Client2() },
},
Scheduler = clientScheduler,
},
Scheduler = clientScheduler
}
.GetProxy<Contracts.IServerOperations>();
Transport = new NamedPipeTransport()
{
PipeName = Contracts.PipeName,
ServerName = ".",
AllowImpersonation = false,
},
}.GetProxy<Contracts.IServerOperations>();


await proxy1.Register();
await proxy1.Broadcast("Hello Bidirectional Http!");
Expand Down
21 changes: 21 additions & 0 deletions src/UiPath.CoreIpc/Client/IpcProxy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
namespace UiPath.Ipc;

public class IpcProxy : DispatchProxy, IDisposable
{
internal ServiceClient ServiceClient { get; set; } = null!;

protected override object? Invoke(MethodInfo? targetMethod, object?[]? args)
=> ServiceClient.Invoke(targetMethod!, args!);

public void Dispose() => ServiceClient?.Dispose();

public ValueTask CloseConnection() => ServiceClient.CloseConnection();

public event EventHandler ConnectionClosed
{
add => ServiceClient.ConnectionClosed += value;
remove => ServiceClient.ConnectionClosed -= value;
}

public Stream? Network => ServiceClient.Network;
}
91 changes: 12 additions & 79 deletions src/UiPath.CoreIpc/Client/ServiceClient.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
namespace UiPath.Ipc;

using System.Linq.Expressions;
using ServiceClientProperFactory = Func<ClientBase, Type, ServiceClient>;

internal abstract class ServiceClient : IDisposable
{
#region " NonGeneric-Generic adapter cache "
Expand All @@ -19,7 +16,6 @@ private static InvokeDelegate CreateInvokeDelegate(Type returnType)
#endregion

protected abstract TimeSpan RequestTimeout { get; }
protected abstract ConnectionFactory? ConnectionFactory { get; }
protected abstract BeforeCallHandler? BeforeCall { get; }
protected abstract ILogger? Log { get; }
protected abstract string DebugName { get; }
Expand Down Expand Up @@ -151,47 +147,11 @@ private void Dispose(bool disposing)
public override string ToString() => DebugName;
}

internal static class ServiceClientProper
{
private static readonly ConcurrentDictionary<Type, ServiceClientProperFactory> CachedFactories = new();
private static ServiceClientProperFactory GetFactory(Type clientType) => CachedFactories.GetOrAdd(clientType, CreateFactory);
private static ServiceClientProperFactory CreateFactory(Type clientType)
{
if (clientType
.GetInterfaces()
.SingleOrDefault(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(IClient<,>))
?.GetGenericArguments()
is not [var clientStateType, var clientType2] || clientType2 != clientType)
{
throw new ArgumentOutOfRangeException(nameof(clientType), "The client implements 0 or more than 1 IClient<,> interfaces or the single interface's 2nd generic argument is not the client type itself.");
}

var ctor = typeof(ServiceClientProper<,>)
.MakeGenericType(clientType, clientStateType)
.GetConstructor([clientType, typeof(Type)])!;

var paramofClientBase = Expression.Parameter(typeof(ClientBase));
var paramofType = Expression.Parameter(typeof(Type));
return Expression.Lambda<ServiceClientProperFactory>(
Expression.New(
ctor,
Expression.Convert(paramofClientBase, clientType),
paramofType),
paramofClientBase,
paramofType).Compile();
}

public static ServiceClient Create(ClientBase client, Type proxyType)
=> GetFactory(client.GetType())(client, proxyType);
}

internal sealed class ServiceClientProper<TClient, TClientState> : ServiceClient
where TClient : ClientBase, IClient<TClientState, TClient>
where TClientState : class, IClientState<TClient, TClientState>, new()
internal sealed class ServiceClientProper : ServiceClient
{
private readonly FastAsyncLock _lock = new();
private readonly TClientState _clientState = new();
private readonly TClient _client;
private readonly IpcClient _client;
private readonly IClientState _clientState;

private Connection? _latestConnection;
private Server? _latestServer;
Expand Down Expand Up @@ -222,9 +182,10 @@ private Connection? LatestConnection

public override Stream? Network => LatestConnection?.Network;

public ServiceClientProper(TClient client, Type interfaceType) : base(interfaceType)
public ServiceClientProper(IpcClient client, Type interfaceType) : base(interfaceType)
{
_client = client;
_clientState = client.Transport.CreateState();
}

public override async ValueTask CloseConnection()
Expand All @@ -248,21 +209,15 @@ public override async ValueTask CloseConnection()
}

LatestConnection = new Connection(await Connect(ct), Serializer, Log, DebugName);
var router = new Router(_client.CreateCallbackRouterConfig(), _client.ServiceProvider);
_latestServer = new Server(router, _client.RequestTimeout, LatestConnection);
var router = new Router(_client.Config.CreateCallbackRouterConfig(), _client.Config.ServiceProvider);
_latestServer = new Server(router, _client.Config.RequestTimeout, LatestConnection);
LatestConnection.Listen().LogException(Log, DebugName);
return (LatestConnection, newlyConnected: true);
}
}

private async Task<Network> Connect(CancellationToken ct)
{
if (ConnectionFactory is not null
&& await ConnectionFactory(_clientState.Network, ct) is { } userProvidedNetwork)
{
return userProvidedNetwork;
}

await _clientState.Connect(_client, ct);

if (_clientState.Network is not { } network)
Expand All @@ -273,12 +228,11 @@ private async Task<Network> Connect(CancellationToken ct)
return network;
}

protected override TimeSpan RequestTimeout => _client.RequestTimeout;
protected override ConnectionFactory? ConnectionFactory => _client.ConnectionFactory;
protected override BeforeCallHandler? BeforeCall => _client.BeforeCall;
protected override ILogger? Log => _client.Logger;
protected override string DebugName => _client.ToString();
protected override ISerializer? Serializer => _client.Serializer;
protected override TimeSpan RequestTimeout => _client.Config.RequestTimeout;
protected override BeforeCallHandler? BeforeCall => _client.Config.BeforeCall;
protected override ILogger? Log => _client.Config.Logger;
protected override string DebugName => _client.Transport.ToString();
protected override ISerializer? Serializer => _client.Config.Serializer;
}

internal sealed class ServiceClientForCallback : ServiceClient
Expand All @@ -298,29 +252,8 @@ public ServiceClientForCallback(Connection connection, Listener listener, Type i
=> Task.FromResult((_connection, newlyConnected: false));

protected override TimeSpan RequestTimeout => _listener.Config.RequestTimeout;
protected override ConnectionFactory? ConnectionFactory => null;
protected override BeforeCallHandler? BeforeCall => null;
protected override ILogger? Log => null;
protected override string DebugName => $"ReverseClient for {_listener}";
protected override ISerializer? Serializer => null;
}

public class IpcProxy : DispatchProxy, IDisposable
{
internal ServiceClient ServiceClient { get; set; } = null!;

protected override object? Invoke(MethodInfo? targetMethod, object?[]? args)
=> ServiceClient.Invoke(targetMethod!, args!);

public void Dispose() => ServiceClient?.Dispose();

public ValueTask CloseConnection() => ServiceClient.CloseConnection();

public event EventHandler ConnectionClosed
{
add => ServiceClient.ConnectionClosed += value;
remove => ServiceClient.ConnectionClosed -= value;
}

public Stream? Network => ServiceClient.Network;
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
namespace UiPath.Ipc;

public abstract record ClientBase : EndpointConfig
public sealed record ClientConfig : EndpointConfig
{
private readonly ConcurrentDictionary<Type, ServiceClient> _clients = new();
private ServiceClient GetServiceClient(Type proxyType) => _clients.GetOrAdd(proxyType, ServiceClientProper.Create(this, proxyType));
public EndpointCollection? Callbacks { get; init; }

public IServiceProvider? ServiceProvider { get; init; }
public EndpointCollection? Callbacks { get; init; }
public ILogger? Logger { get; init; }
public ConnectionFactory? ConnectionFactory { get; init; }
public BeforeCallHandler? BeforeCall { get; init; }
public TaskScheduler? Scheduler { get; init; }
public ISerializer? Serializer { get; set; }

public virtual void Validate() { }

public TProxy GetProxy<TProxy>() where TProxy : class
=> GetServiceClient(typeof(TProxy)).GetProxy<TProxy>();

internal void ValidateInternal()
internal void Validate()
{
var haveDeferredInjectedCallbacks = Callbacks?.Any(x => x.Service.MaybeGetServiceProvider() is null && x.Service.MaybeGetInstance() is null) ?? false;

if (haveDeferredInjectedCallbacks && ServiceProvider is null)
{
throw new InvalidOperationException("ServiceProvider is required when you register injectable callbacks. Consider registering a callback instance.");
}

Validate();
}

internal ILogger? GetLogger(string name)
Expand Down Expand Up @@ -55,16 +45,10 @@ internal override RouterConfig CreateCallbackRouterConfig()
});
}

public interface IClient<TState, TSelf>
where TSelf : ClientBase, IClient<TState, TSelf>
where TState : class, IClientState<TSelf, TState>, new() { }

public interface IClientState<TClient, TSelf> : IDisposable
where TSelf : class, IClientState<TClient, TSelf>, new()
where TClient : ClientBase, IClient<TSelf, TClient>
public interface IClientState : IDisposable
{
Network? Network { get; }

bool IsConnected();
ValueTask Connect(TClient client, CancellationToken ct);
ValueTask Connect(IpcClient client, CancellationToken ct);
}
7 changes: 7 additions & 0 deletions src/UiPath.CoreIpc/Config/ClientTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace UiPath.Ipc;

public abstract record ClientTransport
{
public abstract IClientState CreateState();
public abstract void Validate();
}
Loading

0 comments on commit 28c6d3c

Please sign in to comment.