Skip to content

Commit

Permalink
Protocol: add support for BecomeMonitor.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds committed Dec 22, 2023
1 parent a0ca30c commit f153cd4
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 19 deletions.
6 changes: 6 additions & 0 deletions src/Tmds.DBus.Protocol/Connection.DBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ MessageBuffer CreateMessage()
return writer.CreateMessage();
}
}

public async Task BecomeMonitorAsync(Action<Exception?, DisposableMessage> handler, IEnumerable<MatchRule>? rules = null)
{
DBusConnection connection = await ConnectCoreAsync().ConfigureAwait(false);
await connection.BecomeMonitorAsync(handler, rules).ConfigureAwait(false);
}
}
148 changes: 129 additions & 19 deletions src/Tmds.DBus.Protocol/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public void Invoke(Exception? exception, Message message)
private Observer? _currentObserver;
private SynchronizationContext? _currentSynchronizationContext;
private TaskCompletionSource<Exception?>? _disconnectedTcs;
private bool _isMonitor;
private Action<Exception?, DisposableMessage>? _monitorHandler;

public string? UniqueName => _localName;

Expand Down Expand Up @@ -285,8 +287,8 @@ private async void HandleMessages(Exception? exception, Message message)
bool returnMessageToPool = true;
MessageHandler pendingCall = default;
IMethodHandler? methodHandler = null;

bool isMethodCall = message.MessageType == MessageType.MethodCall;
Action<Exception?, DisposableMessage>? monitor = null;
bool isMethodCall = false; // keep this false when monitor is not null

lock (_gate)
{
Expand All @@ -295,24 +297,30 @@ private async void HandleMessages(Exception? exception, Message message)
return;
}

if (message.ReplySerial.HasValue)
{
_pendingCalls.Remove(message.ReplySerial.Value, out pendingCall);
}
monitor = _monitorHandler;

foreach (var matchMaker in _matchMakers.Values)
if (monitor is null)
{
if (matchMaker.Matches(message))
isMethodCall = message.MessageType == MessageType.MethodCall;
if (message.ReplySerial.HasValue)
{
_matchedObservers.AddRange(matchMaker.Observers);
_pendingCalls.Remove(message.ReplySerial.Value, out pendingCall);
}
}

if (isMethodCall)
{
if (message.PathIsSet)
foreach (var matchMaker in _matchMakers.Values)
{
_pathHandlers.TryGetValue(message.PathAsString!, out methodHandler);
if (matchMaker.Matches(message))
{
_matchedObservers.AddRange(matchMaker.Observers);
}
}

if (isMethodCall)
{
if (message.PathIsSet)
{
_pathHandlers.TryGetValue(message.PathAsString!, out methodHandler);
}
}
}
}
Expand Down Expand Up @@ -354,6 +362,18 @@ private async void HandleMessages(Exception? exception, Message message)
}
}

if (monitor is not null)
{
lock (monitor)
{
if (_monitorHandler is not null)
{
returnMessageToPool = false;
monitor(null, new DisposableMessage(message));
}
}
}

if (returnMessageToPool)
{
message.ReturnToPool();
Expand Down Expand Up @@ -403,7 +423,8 @@ private void EmitOnSynchronizationContextHelper(Observer observer, Synchronizati

#pragma warning disable VSTHRD001 // Await JoinableTaskFactory.SwitchToMainThreadAsync() to switch to the UI thread instead of APIs that can deadlock or require specifying a priority.
// note: Send blocks the current thread until the SynchronizationContext ran the delegate.
synchronizationContext.Send(static o => {
synchronizationContext.Send(static o =>
{
SynchronizationContext? previousContext = SynchronizationContext.Current;
try
{
Expand Down Expand Up @@ -458,13 +479,16 @@ public void AddMethodHandlers(IList<IMethodHandler> methodHandlers)

public void Dispose()
{
Action<Exception?, DisposableMessage>? monitor = null;

lock (_gate)
{
if (_state == ConnectionState.Disconnected)
{
return;
}
_state = ConnectionState.Disconnected;
monitor = _monitorHandler;
}

Exception disconnectReason = DisconnectReason;
Expand All @@ -489,6 +513,15 @@ public void Dispose()
}
_matchMakers.Clear();

if (monitor is not null)
{
lock (monitor)
{
_monitorHandler = null;
monitor(new DisconnectedException(disconnectReason), new DisposableMessage(null));
}
}

_disconnectedTcs?.SetResult(GetWaitForDisconnectException());
}

Expand All @@ -514,6 +547,10 @@ private async ValueTask CallMethodAsync(MessageBuffer message, MessageHandler ha
{
throw new DisconnectedException(DisconnectReason!);
}
if (_isMonitor)
{
throw new InvalidOperationException("Cannot send messages on monitor connection.");
}
if ((message.MessageFlags & MessageFlags.NoReplyExpected) == 0)
{
_pendingCalls.Add(message.Serial, handler);
Expand Down Expand Up @@ -547,7 +584,6 @@ public async Task<T> CallMethodAsync<T>(MessageBuffer message, MessageValueReade
try
{
vtsState.SetResult(valueReaderState(message, state3));
}
catch (Exception ex)
{
Expand Down Expand Up @@ -615,7 +651,77 @@ private static DBusException CreateDBusExceptionForErrorMessage(Message message)
return new DBusException(errorName, errMessage);
}

public ValueTask<IDisposable> AddMatchAsync<T>(SynchronizationContext? synchronizationContext, MatchRule rule, MessageValueReader<T> valueReader,Action<Exception?, T, object?, object?> valueHandler, object? readerState, object? handlerState, bool subscribe)
public async Task BecomeMonitorAsync(Action<Exception?, DisposableMessage> handler, IEnumerable<MatchRule>? rules)
{
Task reply;

lock (_gate)
{
if (_state != ConnectionState.Connected)
{
throw new DisconnectedException(DisconnectReason!);
}
if (!RemoteIsBus)
{
throw new InvalidOperationException("The remote is not a bus.");
}
if (_matchMakers.Count != 0)
{
throw new InvalidOperationException("The connection has observers.");
}
if (_pendingCalls.Count != 0)
{
throw new InvalidOperationException("The connection has pending method calls.");
}

HashSet<string>? ruleStrings = null;
if (rules is not null)
{
ruleStrings = new();
foreach (var rule in rules)
{
ruleStrings.Add(rule.ToString());
}
}

reply = CallMethodAsync(CreateMessage(ruleStrings));
_isMonitor = true;
}

try
{
await reply.ConfigureAwait(false);
lock (_gate)
{
_monitorHandler = handler;
}
}
catch
{
lock (_gate)
{
_isMonitor = false;
}

throw;
}

MessageBuffer CreateMessage(IEnumerable<string>? rules)
{
using var writer = GetMessageWriter();
writer.WriteMethodCallHeader(
destination: Connection.DBusServiceName,
path: Connection.DBusObjectPath,
@interface: "org.freedesktop.DBus.Monitoring",
signature: "asu",
member: "BecomeMonitor");
writer.WriteArray(rules ?? Array.Empty<string>());
writer.WriteUInt32(0);
return writer.CreateMessage();
}
}

public ValueTask<IDisposable> AddMatchAsync<T>(SynchronizationContext? synchronizationContext, MatchRule rule, MessageValueReader<T> valueReader, Action<Exception?, T, object?, object?> valueHandler, object? readerState, object? handlerState, bool subscribe)
{
MessageHandlerDelegate4 fn = static (Exception? exception, Message message, object? reader, object? handler, object? rs, object? hs) =>
{
Expand Down Expand Up @@ -649,11 +755,14 @@ private async ValueTask<IDisposable> AddMatchAsync(SynchronizationContext? synch
{
throw new DisconnectedException(DisconnectReason!);
}

if (!RemoteIsBus)
{
subscribe = false;
}
if (_isMonitor)
{
throw new InvalidOperationException("Cannot add subscriptions on a monitor connection.");
}

ruleString = data.GetRuleString();

Expand Down Expand Up @@ -792,7 +901,8 @@ private void Emit(Exception exception)
else
{
_synchronizationContext.Send(
delegate {
delegate
{
SynchronizationContext? previousContext = SynchronizationContext.Current;
try
{
Expand Down
18 changes: 18 additions & 0 deletions src/Tmds.DBus.Protocol/DisposableMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Tmds.DBus.Protocol;

public struct DisposableMessage : IDisposable
{
private Message? _message;

internal DisposableMessage(Message? message)
=> _message = message;

public Message Message
=> _message ?? throw new ObjectDisposedException(typeof(Message).FullName);

public void Dispose()
{
_message?.ReturnToPool();
_message = null;
}
}

0 comments on commit f153cd4

Please sign in to comment.