diff --git a/samples/Monitor/Program.cs b/samples/Monitor/Program.cs index 808e4674..1d08275c 100644 --- a/samples/Monitor/Program.cs +++ b/samples/Monitor/Program.cs @@ -3,51 +3,10 @@ string address = Address.Session ?? throw new ArgumentNullException(nameof(address)); -await foreach (var dmsg in MonitorMessagesAsync(address)) +await foreach (DisposableMessage dmsg in Connection.MonitorBusAsync(address)) { using var _ = dmsg; Message msg = dmsg.Message; Console.WriteLine($"{msg.MessageType} {msg.SenderAsString} -> {msg.DestinationAsString}"); } - -IAsyncEnumerable MonitorMessagesAsync(string address) -{ - var channel = Channel.CreateUnbounded(); - - WriteMessagesToChannel(address, channel.Writer); - - return channel.Reader.ReadAllAsync(); - - static async void WriteMessagesToChannel(string address, ChannelWriter writer) - { - try - { - using var connection = new Connection(address); - await connection.ConnectAsync(); - - await connection.BecomeMonitorAsync( - (Exception? ex, DisposableMessage message) => - { - if (ex is not null) - { - writer.TryComplete(ex); - return; - } - - if (!writer.TryWrite(message)) - { - message.Dispose(); - } - } - ); - - Exception? ex = await connection.DisconnectedAsync(); - writer.TryComplete(ex); - } - catch (Exception ex) - { - writer.TryComplete(ex); - } - } -} \ No newline at end of file diff --git a/src/Tmds.DBus.Protocol/Connection.DBus.cs b/src/Tmds.DBus.Protocol/Connection.DBus.cs index 798edcf2..f27ad51c 100644 --- a/src/Tmds.DBus.Protocol/Connection.DBus.cs +++ b/src/Tmds.DBus.Protocol/Connection.DBus.cs @@ -1,3 +1,5 @@ +using System.Threading.Channels; + namespace Tmds.DBus.Protocol; public partial class Connection @@ -31,4 +33,65 @@ public async Task BecomeMonitorAsync(Action handl DBusConnection connection = await ConnectCoreAsync().ConfigureAwait(false); await connection.BecomeMonitorAsync(handler, rules).ConfigureAwait(false); } + + public static async IAsyncEnumerable MonitorBusAsync(string address, IEnumerable? rules = null, [EnumeratorCancellation]CancellationToken ct = default) + { + ct.ThrowIfCancellationRequested(); + + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions() + { + AllowSynchronousContinuations = true, + SingleReader = true, + SingleWriter = true, + } + ); + + using var connection = new Connection(address); + using CancellationTokenRegistration ctr = +#if NETCOREAPP3_1_OR_GREATER + ct.UnsafeRegister(c => ((Connection)c!).Dispose(), connection); +#else + ct.Register(c => ((Connection)c!).Dispose(), connection); +#endif + try + { + await connection.ConnectAsync().ConfigureAwait(false); + + await connection.BecomeMonitorAsync( + (Exception? ex, DisposableMessage message) => + { + if (ex is not null) + { + if (ct.IsCancellationRequested) + { + ex = new OperationCanceledException(ct); + } + channel.Writer.TryComplete(ex); + return; + } + + if (!channel.Writer.TryWrite(message)) + { + message.Dispose(); + } + }, + rules + ).ConfigureAwait(false); + } + catch + { + ct.ThrowIfCancellationRequested(); + + throw; + } + + while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false)) + { + if (channel.Reader.TryRead(out DisposableMessage msg)) + { + yield return msg; + } + } + } } \ No newline at end of file diff --git a/src/Tmds.DBus.Protocol/DBusConnection.cs b/src/Tmds.DBus.Protocol/DBusConnection.cs index cc807ac9..ccfe53eb 100644 --- a/src/Tmds.DBus.Protocol/DBusConnection.cs +++ b/src/Tmds.DBus.Protocol/DBusConnection.cs @@ -359,7 +359,7 @@ private async void HandleMessages(Exception? exception, Message message) bool runHandlerSynchronously = methodHandler.RunMethodHandlerSynchronously(message); if (runHandlerSynchronously) { - await methodHandler.HandleMethodAsync(context); + await methodHandler.HandleMethodAsync(context).ConfigureAwait(false); SendUnknownMethodErrorIfNoReplySent(context); } else @@ -406,7 +406,7 @@ private async void RunMethodHandler(IMethodHandler methodHandler, MethodContext { try { - await methodHandler.HandleMethodAsync(context); + await methodHandler.HandleMethodAsync(context).ConfigureAwait(false); SendUnknownMethodErrorIfNoReplySent(context); context.Request.ReturnToPool(); } diff --git a/src/Tmds.DBus.Protocol/Netstandard2_0Extensions.cs b/src/Tmds.DBus.Protocol/Netstandard2_0Extensions.cs index 2946863f..ea65bc33 100644 --- a/src/Tmds.DBus.Protocol/Netstandard2_0Extensions.cs +++ b/src/Tmds.DBus.Protocol/Netstandard2_0Extensions.cs @@ -97,7 +97,7 @@ public static unsafe string AsString(this Span chars) public static async ValueTask ReceiveAsync(this Socket socket, Memory buffer, SocketFlags socketFlags) { if (MemoryMarshal.TryGetArray((ReadOnlyMemory)buffer, out var segment)) - return await SocketTaskExtensions.ReceiveAsync(socket, segment, socketFlags); + return await SocketTaskExtensions.ReceiveAsync(socket, segment, socketFlags).ConfigureAwait(false); throw new NotSupportedException(); } diff --git a/src/Tmds.DBus.Protocol/Netstandard2_1Extensions.cs b/src/Tmds.DBus.Protocol/Netstandard2_1Extensions.cs index d9ad06ec..a71c3803 100644 --- a/src/Tmds.DBus.Protocol/Netstandard2_1Extensions.cs +++ b/src/Tmds.DBus.Protocol/Netstandard2_1Extensions.cs @@ -74,7 +74,7 @@ await Task.Factory.FromAsync( (targetEndPoint, callback, state) => ((Socket)state).BeginConnect(targetEndPoint, callback, state), asyncResult => ((Socket)asyncResult.AsyncState).EndConnect(asyncResult), remoteEP, - state: socket); + state: socket).ConfigureAwait(false); } catch (ObjectDisposedException) { diff --git a/src/Tmds.DBus.Protocol/SocketExtensions.cs b/src/Tmds.DBus.Protocol/SocketExtensions.cs index 8a4a3091..0710a9bb 100644 --- a/src/Tmds.DBus.Protocol/SocketExtensions.cs +++ b/src/Tmds.DBus.Protocol/SocketExtensions.cs @@ -60,7 +60,7 @@ private static async ValueTask SendAsync(this Socket socket, ReadOnlyMemory 0) { - int sent = await socket.SendAsync(buffer, SocketFlags.None); + int sent = await socket.SendAsync(buffer, SocketFlags.None).ConfigureAwait(false); buffer = buffer.Slice(sent); } }