Skip to content

Commit

Permalink
Add Connection.MonitorBusAsync.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds committed Dec 26, 2023
1 parent 3dd7499 commit a95edcd
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 47 deletions.
43 changes: 1 addition & 42 deletions samples/Monitor/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,10 @@

string address = Address.Session ?? throw new ArgumentNullException(nameof(address));

await foreach (var dmsg in MonitorMessagesAsync(address))
await foreach (DisposableMessage dmsg in Connection.MonitorBusAsync(address))
{
using var _ = dmsg;
Message msg = dmsg.Message;

Console.WriteLine($"{msg.MessageType} {msg.SenderAsString} -> {msg.DestinationAsString}");
}

IAsyncEnumerable<DisposableMessage> MonitorMessagesAsync(string address)
{
var channel = Channel.CreateUnbounded<DisposableMessage>();

WriteMessagesToChannel(address, channel.Writer);

return channel.Reader.ReadAllAsync();

static async void WriteMessagesToChannel(string address, ChannelWriter<DisposableMessage> writer)
{
try
{
using var connection = new Connection(address);
await connection.ConnectAsync();

await connection.BecomeMonitorAsync(
(Exception? ex, DisposableMessage message) =>
{
if (ex is not null)
{
writer.TryComplete(ex);
return;
}
if (!writer.TryWrite(message))
{
message.Dispose();
}
}
);

Exception? ex = await connection.DisconnectedAsync();
writer.TryComplete(ex);
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
}
}
63 changes: 63 additions & 0 deletions src/Tmds.DBus.Protocol/Connection.DBus.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Threading.Channels;

namespace Tmds.DBus.Protocol;

public partial class Connection
Expand Down Expand Up @@ -31,4 +33,65 @@ public async Task BecomeMonitorAsync(Action<Exception?, DisposableMessage> handl
DBusConnection connection = await ConnectCoreAsync().ConfigureAwait(false);
await connection.BecomeMonitorAsync(handler, rules).ConfigureAwait(false);
}

public static async IAsyncEnumerable<DisposableMessage> MonitorBusAsync(string address, IEnumerable<MatchRule>? rules = null, [EnumeratorCancellation]CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

var channel = Channel.CreateUnbounded<DisposableMessage>(
new UnboundedChannelOptions()
{
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true,
}
);

using var connection = new Connection(address);
using CancellationTokenRegistration ctr =
#if NETCOREAPP3_1_OR_GREATER
ct.UnsafeRegister(c => ((Connection)c!).Dispose(), connection);
#else
ct.Register(c => ((Connection)c!).Dispose(), connection);
#endif
try
{
await connection.ConnectAsync().ConfigureAwait(false);

await connection.BecomeMonitorAsync(
(Exception? ex, DisposableMessage message) =>
{
if (ex is not null)
{
if (ct.IsCancellationRequested)
{
ex = new OperationCanceledException(ct);
}
channel.Writer.TryComplete(ex);
return;
}
if (!channel.Writer.TryWrite(message))
{
message.Dispose();
}
},
rules
).ConfigureAwait(false);
}
catch
{
ct.ThrowIfCancellationRequested();

throw;
}

while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (channel.Reader.TryRead(out DisposableMessage msg))
{
yield return msg;
}
}
}
}
4 changes: 2 additions & 2 deletions src/Tmds.DBus.Protocol/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ private async void HandleMessages(Exception? exception, Message message)
bool runHandlerSynchronously = methodHandler.RunMethodHandlerSynchronously(message);
if (runHandlerSynchronously)
{
await methodHandler.HandleMethodAsync(context);
await methodHandler.HandleMethodAsync(context).ConfigureAwait(false);
SendUnknownMethodErrorIfNoReplySent(context);
}
else
Expand Down Expand Up @@ -406,7 +406,7 @@ private async void RunMethodHandler(IMethodHandler methodHandler, MethodContext
{
try
{
await methodHandler.HandleMethodAsync(context);
await methodHandler.HandleMethodAsync(context).ConfigureAwait(false);
SendUnknownMethodErrorIfNoReplySent(context);
context.Request.ReturnToPool();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Tmds.DBus.Protocol/Netstandard2_0Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static unsafe string AsString(this Span<char> chars)
public static async ValueTask<int> ReceiveAsync(this Socket socket, Memory<byte> buffer, SocketFlags socketFlags)
{
if (MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)buffer, out var segment))
return await SocketTaskExtensions.ReceiveAsync(socket, segment, socketFlags);
return await SocketTaskExtensions.ReceiveAsync(socket, segment, socketFlags).ConfigureAwait(false);

throw new NotSupportedException();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Tmds.DBus.Protocol/Netstandard2_1Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ await Task.Factory.FromAsync(
(targetEndPoint, callback, state) => ((Socket)state).BeginConnect(targetEndPoint, callback, state),
asyncResult => ((Socket)asyncResult.AsyncState).EndConnect(asyncResult),
remoteEP,
state: socket);
state: socket).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Tmds.DBus.Protocol/SocketExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private static async ValueTask SendAsync(this Socket socket, ReadOnlyMemory<byte
{
while (buffer.Length > 0)
{
int sent = await socket.SendAsync(buffer, SocketFlags.None);
int sent = await socket.SendAsync(buffer, SocketFlags.None).ConfigureAwait(false);
buffer = buffer.Slice(sent);
}
}
Expand Down

0 comments on commit a95edcd

Please sign in to comment.