Skip to content

Commit

Permalink
Add example.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds committed Dec 26, 2023
1 parent f153cd4 commit 47c1c86
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Install .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: "7.0.x"
dotnet-version: "8.0.x"

- name: Install reportgenerator tool
run: dotnet tool install --global dotnet-reportgenerator-globaltool
Expand Down
14 changes: 14 additions & 0 deletions samples/Monitor/Monitor.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\..\src\Tmds.DBus.Protocol\Tmds.DBus.Protocol.csproj" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
53 changes: 53 additions & 0 deletions samples/Monitor/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Tmds.DBus.Protocol;
using System.Threading.Channels;

string address = Address.Session ?? throw new ArgumentNullException(nameof(address));

await foreach (var dmsg in MonitorMessagesAsync(address))
{
using var _ = dmsg;
Message msg = dmsg.Message;

Console.WriteLine($"{msg.MessageType} {msg.SenderAsString} -> {msg.DestinationAsString}");
}

IAsyncEnumerable<DisposableMessage> MonitorMessagesAsync(string address)
{
var channel = Channel.CreateUnbounded<DisposableMessage>();

WriteMessagesToChannel(address, channel.Writer);

return channel.Reader.ReadAllAsync();

static async void WriteMessagesToChannel(string address, ChannelWriter<DisposableMessage> writer)
{
try
{
using var connection = new Connection(address);
await connection.ConnectAsync();

await connection.BecomeMonitorAsync(
(Exception? ex, DisposableMessage message) =>
{
if (ex is not null)
{
writer.TryComplete(ex);
return;
}
if (!writer.TryWrite(message))
{
message.Dispose();
}
}
);

Exception? ex = await connection.DisconnectedAsync();
writer.TryComplete(ex);
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
}
}
68 changes: 35 additions & 33 deletions src/Tmds.DBus.Protocol/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private async void HandleMessages(Exception? exception, Message message)
MessageHandler pendingCall = default;
IMethodHandler? methodHandler = null;
Action<Exception?, DisposableMessage>? monitor = null;
bool isMethodCall = false; // keep this false when monitor is not null
bool isMethodCall = message.MessageType == MessageType.MethodCall;

lock (_gate)
{
Expand All @@ -301,7 +301,6 @@ private async void HandleMessages(Exception? exception, Message message)

if (monitor is null)
{
isMethodCall = message.MessageType == MessageType.MethodCall;
if (message.ReplySerial.HasValue)
{
_pendingCalls.Remove(message.ReplySerial.Value, out pendingCall);
Expand All @@ -325,51 +324,53 @@ private async void HandleMessages(Exception? exception, Message message)
}
}

if (_matchedObservers.Count != 0)
if (monitor is not null)
{
foreach (var observer in _matchedObservers)
lock (monitor)
{
observer.Emit(message);
if (_monitorHandler is not null)
{
returnMessageToPool = false;
monitor(null, new DisposableMessage(message));
}
}
_matchedObservers.Clear();
}

if (pendingCall.HasValue)
{
pendingCall.Invoke(null, message);
}

if (isMethodCall)
else
{
MethodContext context = new MethodContext(_parentConnection, message); // TODO: pool.
if (methodHandler is not null)
if (_matchedObservers.Count != 0)
{
bool runHandlerSynchronously = methodHandler.RunMethodHandlerSynchronously(message);
if (runHandlerSynchronously)
{
await methodHandler.HandleMethodAsync(context);
SendUnknownMethodErrorIfNoReplySent(context);
}
else
foreach (var observer in _matchedObservers)
{
returnMessageToPool = false;
RunMethodHandler(methodHandler, context);
observer.Emit(message);
}
_matchedObservers.Clear();
}
else

if (pendingCall.HasValue)
{
SendUnknownMethodErrorIfNoReplySent(context);
pendingCall.Invoke(null, message);
}
}

if (monitor is not null)
{
lock (monitor)
if (isMethodCall)
{
if (_monitorHandler is not null)
MethodContext context = new MethodContext(_parentConnection, message); // TODO: pool.
if (methodHandler is not null)
{
returnMessageToPool = false;
monitor(null, new DisposableMessage(message));
bool runHandlerSynchronously = methodHandler.RunMethodHandlerSynchronously(message);
if (runHandlerSynchronously)
{
await methodHandler.HandleMethodAsync(context);
SendUnknownMethodErrorIfNoReplySent(context);
}
else
{
returnMessageToPool = false;
RunMethodHandler(methodHandler, context);
}
}
else
{
SendUnknownMethodErrorIfNoReplySent(context);
}
}
}
Expand Down Expand Up @@ -693,6 +694,7 @@ public async Task BecomeMonitorAsync(Action<Exception?, DisposableMessage> handl
await reply.ConfigureAwait(false);
lock (_gate)
{
_messageStream!.BecomeMonitor();
_monitorHandler = handler;
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/Tmds.DBus.Protocol/IMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ interface IMessageStream

ValueTask<bool> TrySendMessageAsync(MessageBuffer message);

void BecomeMonitor();

void Close(Exception closeReason);
}
8 changes: 4 additions & 4 deletions src/Tmds.DBus.Protocol/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void ClearHeaders()
_signature.Clear();
}

internal static Message? TryReadMessage(MessagePool messagePool, ref ReadOnlySequence<byte> sequence, UnixFdCollection? handles = null)
internal static Message? TryReadMessage(MessagePool messagePool, ref ReadOnlySequence<byte> sequence, UnixFdCollection? handles = null, bool isMonitor = false)
{
SequenceReader<byte> seqReader = new(sequence);
if (!seqReader.TryRead(out byte endianness) ||
Expand Down Expand Up @@ -167,7 +167,7 @@ private void ClearHeaders()
message.Serial = serial;
message.MessageType = (MessageType)msgType;
message.MessageFlags = (MessageFlags)flags;
message.ParseHeader(handles);
message.ParseHeader(handles, isMonitor);

return message;

Expand All @@ -180,7 +180,7 @@ static bool TryReadUInt32(ref SequenceReader<byte> seqReader, bool isBigEndian,
}
}

private void ParseHeader(UnixFdCollection? handles)
private void ParseHeader(UnixFdCollection? handles, bool isMonitor)
{
var reader = new Reader(IsBigEndian, _data.AsReadOnlySequence);
reader.Advance(HeaderFieldsLengthOffset);
Expand Down Expand Up @@ -218,7 +218,7 @@ private void ParseHeader(UnixFdCollection? handles)
break;
case MessageHeader.UnixFds:
UnixFdCount = (int)reader.ReadUInt32();
if (UnixFdCount > 0)
if (UnixFdCount > 0 && !isMonitor)
{
if (handles is null || UnixFdCount > handles.Count)
{
Expand Down
12 changes: 9 additions & 3 deletions src/Tmds.DBus.Protocol/MessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class MessageStream : IMessageStream
private readonly PipeReader _pipeReader;

private Exception? _completionException;
private bool _isMonitor;

public MessageStream(Socket socket)
{
Expand All @@ -45,6 +46,11 @@ public MessageStream(Socket socket)
_messagePool = new();
}

public void BecomeMonitor()
{
_isMonitor = true;
}

private async void ReadFromSocketIntoPipe()
{
var writer = _pipeWriter;
Expand Down Expand Up @@ -121,7 +127,7 @@ public async void ReceiveMessages<T>(IMessageStream.MessageReceivedHandler<T> ha
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence<byte> buffer = result.Buffer;

ReadMessages(ref buffer, _fdCollection, _messagePool, handler, state);
ReadMessages(ref buffer, handler, state);

reader.AdvanceTo(buffer.Start, buffer.End);
}
Expand All @@ -136,10 +142,10 @@ public async void ReceiveMessages<T>(IMessageStream.MessageReceivedHandler<T> ha
_fdCollection?.Dispose();
}

static void ReadMessages<TState>(ref ReadOnlySequence<byte> buffer, UnixFdCollection? fdCollection, MessagePool messagePool, IMessageStream.MessageReceivedHandler<TState> handler, TState state)
void ReadMessages<TState>(ref ReadOnlySequence<byte> buffer, IMessageStream.MessageReceivedHandler<TState> handler, TState state)
{
Message? message;
while ((message = Message.TryReadMessage(messagePool, ref buffer, fdCollection)) != null)
while ((message = Message.TryReadMessage(_messagePool, ref buffer, _fdCollection, _isMonitor)) != null)
{
handler(closeReason: null, message, state);
}
Expand Down
3 changes: 3 additions & 0 deletions test/Tmds.DBus.Protocol.Tests/PairedConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,8 @@ public void Close(Exception closeReason)
{
TrySendMessageAsync(null!); // Use null as EOF.
}

public void BecomeMonitor()
=> throw new InvalidOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>
Expand Down
2 changes: 1 addition & 1 deletion test/Tmds.DBus.Tests/Tmds.DBus.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<TargetFramework>net8.0</TargetFramework>
<SignAssembly>true</SignAssembly>
<AssemblyOriginatorKeyFile>..\..\sign.snk</AssemblyOriginatorKeyFile>
<PublicSign>true</PublicSign>
Expand Down

0 comments on commit 47c1c86

Please sign in to comment.