diff --git a/src/Dapr.Messaging/AssemblyInfo.cs b/src/Dapr.Messaging/AssemblyInfo.cs
new file mode 100644
index 000000000..4e2e7a0a7
--- /dev/null
+++ b/src/Dapr.Messaging/AssemblyInfo.cs
@@ -0,0 +1,18 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using System.Runtime.CompilerServices;
+
+[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")]
+
+
diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
index 39024cb35..33ef05494 100644
--- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
+++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs
@@ -47,7 +47,7 @@ internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClien
///
public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient, string? daprApiToken)
{
- Client = client;
+ this.Client = client;
this.HttpClient = httpClient;
this.DaprApiToken = daprApiToken;
}
@@ -63,7 +63,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient
///
public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
{
- var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, Client);
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, this.Client);
await receiver.SubscribeAsync(cancellationToken);
return receiver;
}
diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
index 886d57006..4b0d608ff 100644
--- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
+++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
@@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
///
private bool isDisposed;
+ // Internal property for testing purposes
+ internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
+ // Internal property for testing purposes
+ internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;
+
///
/// Constructs a new instance of a instance.
///
@@ -115,20 +120,40 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default
var stream = await GetStreamAsync(cancellationToken);
- //Retrieve the messages from the sidecar and write to the messages channel
- var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
+ //Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
+ _ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
+ .ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
+ TaskScheduler.Default);
//Process the messages as they're written to either channel
- var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken);
- var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
+ _ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
+ null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
+ _ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
+ cancellationToken,
+ TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
+ }
- try
- {
- await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
- }
- catch (OperationCanceledException)
+ ///
+ /// Exposed for testing purposes only.
+ ///
+ /// The test message to write.
+ internal async Task WriteMessageToChannelAsync(TopicMessage message)
+ {
+ await topicMessagesChannel.Writer.WriteAsync(message);
+ }
+
+ //Exposed for testing purposes only
+ internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement)
+ {
+ await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
+ }
+
+ //Exposed for testing purposes only
+ internal static void HandleTaskCompletion(Task task, object? state)
+ {
+ if (task.Exception != null)
{
- // Will be cleaned up during DisposeAsync
+ throw task.Exception;
}
}
@@ -251,13 +276,21 @@ await stream.RequestStream.WriteAsync(
//Each time a message is received from the stream, push it into the topic messages channel
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
{
+ //https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null
+ //Skip the initial response - we only want to pass along TopicMessage payloads to developers
+ if (response?.EventMessage is null)
+ {
+ continue;
+ }
+
var message =
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
- Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
+ Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value),
+ Data = response.EventMessage.Data.ToByteArray()
};
try
@@ -308,6 +341,6 @@ public async ValueTask DisposeAsync()
///
/// The identifier of the message.
/// The action to take on the message in the acknowledgement request.
- private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
+ internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
}
diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
index 8f39e1713..0b10230f7 100644
--- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
+++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
@@ -25,6 +25,7 @@
+
diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs
index d239fb86d..d8e218d52 100644
--- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs
+++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs
@@ -1,20 +1,90 @@
-using Dapr.Messaging.PublishSubscribe;
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;
+using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using Moq;
namespace Dapr.Messaging.Test.Extensions;
public sealed class PublishSubscribeServiceCollectionExtensionsTests
{
[Fact]
- public void AddDaprPubSubClient_RegistersIHttpClientFactory()
+ public void AddDaprMessagingClient_FromIConfiguration()
{
+ const string apiToken = "abc123";
+ var configuration = new ConfigurationBuilder()
+ .AddInMemoryCollection(new Dictionary
+ {
+ {"DAPR_API_TOKEN", apiToken }
+ })
+ .Build();
+
var services = new ServiceCollection();
+ services.AddSingleton(configuration);
+
+ services.AddDaprPubSubClient();
+
+ var app = services.BuildServiceProvider();
+
+ var pubSubClient = app.GetRequiredService() as DaprPublishSubscribeGrpcClient;
+
+ Assert.NotNull(pubSubClient!);
+ Assert.Equal(apiToken, pubSubClient.DaprApiToken);
+ }
+
+ [Fact]
+ public void AddDaprPubSubClient_RegistersIHttpClientFactory()
+ {
+ var services = new ServiceCollection();
services.AddDaprPubSubClient();
var serviceProvider = services.BuildServiceProvider();
+ var daprClient = serviceProvider.GetService();
+ Assert.NotNull(daprClient);
+ }
+
+ [Fact]
+ public void AddDaprPubSubClient_CallsConfigureAction()
+ {
+ var services = new ServiceCollection();
+
+ var configureCalled = false;
+
+ services.AddDaprPubSubClient(Configure);
+
+ var serviceProvider = services.BuildServiceProvider();
+ var daprClient = serviceProvider.GetService();
+ Assert.NotNull(daprClient);
+ Assert.True(configureCalled);
+ return;
+
+ void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder)
+ {
+ configureCalled = true;
+ }
+ }
+ [Fact]
+ public void AddDaprPubSubClient_RegistersServicesCorrectly()
+ {
+ var services = new ServiceCollection();
+ services.AddDaprPubSubClient();
+ var serviceProvider = services.BuildServiceProvider();
+
var httpClientFactory = serviceProvider.GetService();
Assert.NotNull(httpClientFactory);
diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs
index 0efb5e879..6efdd6397 100644
--- a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs
+++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs
@@ -1,4 +1,17 @@
-using Dapr.Messaging.PublishSubscribe;
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using Dapr.Messaging.PublishSubscribe;
namespace Dapr.Messaging.Test.PublishSubscribe
{
diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs
new file mode 100644
index 000000000..f8070aa66
--- /dev/null
+++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs
@@ -0,0 +1,206 @@
+// ------------------------------------------------------------------------
+// Copyright 2024 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+using System.Threading.Channels;
+using Dapr.AppCallback.Autogen.Grpc.v1;
+using Dapr.Messaging.PublishSubscribe;
+using Grpc.Core;
+using Moq;
+using P = Dapr.Client.Autogen.Grpc.v1;
+
+namespace Dapr.Messaging.Test.PublishSubscribe;
+
+public class PublishSubscribeReceiverTests
+{
+ [Fact]
+ public void SubscribeAsync_ShouldNotBlock()
+ {
+ const string pubSubName = "testPubSub";
+ const string topicName = "testTopic";
+ var options =
+ new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
+ {
+ MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1)
+ };
+
+ var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
+
+ //Mock the daprClient
+ var mockDaprClient = new Mock();
+
+ //Create a mock AsyncDuplexStreamingCall
+ var mockRequestStream = new Mock>();
+ var mockResponseStream = new Mock>();
+ var mockCall =
+ new AsyncDuplexStreamingCall(
+ mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()),
+ () => new Status(), () => new Metadata(), () => { });
+
+ //Setup the mock to return the mock call
+ mockDaprClient.Setup(client =>
+ client.SubscribeTopicEventsAlpha1(null, null, It.IsAny()))
+ .Returns(mockCall);
+
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
+ var stopwatch = System.Diagnostics.Stopwatch.StartNew();
+ var subscribeTask = receiver.SubscribeAsync();
+ stopwatch.Stop();
+
+ Assert.True(stopwatch.ElapsedMilliseconds < 100, "SubscribeAsync should return immediately and not block");
+ }
+
+ [Fact]
+ public void Constructor_ShouldInitializeCorrectly()
+ {
+ const string pubSubName = "testPubSub";
+ const string topicName = "testTopic";
+ var options =
+ new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
+ {
+ MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1)
+ };
+
+ var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
+
+ //Mock the daprClient
+ var mockDaprClient = new Mock();
+ var receiver =
+ new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
+ Assert.NotNull(receiver);
+ }
+
+ [Fact]
+ public async Task ProcessTopicChannelMessagesAsync_ShouldProcessMessages()
+ {
+ const string pubSubName = "testPubSub";
+ const string topicName = "testTopic";
+ var options =
+ new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
+ {
+ MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1)
+ };
+
+ // Mock the message handler
+ var mockMessageHandler = new Mock();
+ mockMessageHandler
+ .Setup(handler => handler(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(TopicResponseAction.Success);
+
+ //Mock the daprClient
+ var mockDaprClient = new Mock();
+ // Create a mock AsyncDuplexStreamingCall
+ var mockRequestStream = new Mock>();
+ var mockResponseStream = new Mock>();
+ var mockCall = new AsyncDuplexStreamingCall(
+ mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { });
+
+ //Set up the mock to return the mock call
+ mockDaprClient.Setup(client => client.SubscribeTopicEventsAlpha1(null, null, It.IsAny()))
+ .Returns(mockCall);
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, mockMessageHandler.Object, mockDaprClient.Object);
+
+ await receiver.SubscribeAsync();
+
+ //Write a message to the channel
+ var message = new TopicMessage("id", "source", "type", "specVersion", "dataContentType", topicName, pubSubName);
+ await receiver.WriteMessageToChannelAsync(message);
+
+ //Allow some time for the message to be processed
+ await Task.Delay(100);
+
+ mockMessageHandler.Verify(handler => handler(It.IsAny(), It.IsAny()),
+ Times.Once);
+ }
+
+ [Fact]
+ public async Task SubscribeAsync_ShouldProcessAcknowledgements()
+ {
+ const string pubSubName = "testPubSub";
+ const string topicName = "testTopic";
+ var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(30), TopicResponseAction.Success))
+ {
+ MaximumQueuedMessages = 100 // Example value, adjust as needed
+ };
+
+ // Mock the message handler
+ var mockMessageHandler = new Mock();
+ mockMessageHandler
+ .Setup(handler => handler(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(TopicResponseAction.Success);
+
+ // Mock the DaprClient
+ var mockDaprClient = new Mock();
+
+ // Create a mock AsyncDuplexStreamingCall
+ var mockRequestStream = new Mock>();
+ var mockResponseStream = new Mock>();
+ var mockCall = new AsyncDuplexStreamingCall(
+ mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { });
+
+ // Setup the mock to return the mock call
+ mockDaprClient.Setup(client => client.SubscribeTopicEventsAlpha1(null, null, It.IsAny()))
+ .Returns(mockCall);
+
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, mockMessageHandler.Object, mockDaprClient.Object);
+
+ await receiver.SubscribeAsync();
+
+ // Use reflection to access the private acknowledgementsChannel and write an acknowledgement
+ var acknowledgementsChannelField = typeof(PublishSubscribeReceiver).GetField("acknowledgementsChannel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance);
+ if (acknowledgementsChannelField is null)
+ Assert.Fail();
+ var acknowledgementsChannel = (Channel)acknowledgementsChannelField.GetValue(receiver)!;
+
+ var acknowledgement = new PublishSubscribeReceiver.TopicAcknowledgement("id", TopicEventResponse.Types.TopicEventResponseStatus.Success);
+ await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
+
+ // Allow some time for the acknowledgement to be processed
+ await Task.Delay(100);
+
+ // Verify that the request stream's WriteAsync method was called twice (initial request + acknowledgement)
+ mockRequestStream.Verify(stream => stream.WriteAsync(It.IsAny(), It.IsAny()), Times.Exactly(2));
+ }
+
+ [Fact]
+ public async Task DisposeAsync_ShouldCompleteChannels()
+ {
+ const string pubSubName = "testPubSub";
+ const string topicName = "testTopic";
+ var options =
+ new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
+ {
+ MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1)
+ };
+
+ var messageHandler = new TopicMessageHandler((message, topic) => Task.FromResult(TopicResponseAction.Success));
+ var daprClient = new Mock();
+ var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient.Object);
+
+ await receiver.DisposeAsync();
+
+ Assert.True(receiver.TopicMessagesChannelCompletion.IsCompleted);
+ Assert.True(receiver.AcknowledgementsChannelCompletion.IsCompleted);
+ }
+
+ [Fact]
+ public void HandleTaskCompletion_ShouldThrowException_WhenTaskHasException()
+ {
+ var task = Task.FromException(new InvalidOperationException("Test exception"));
+
+ var exception = Assert.Throws(() =>
+ PublishSubscribeReceiver.HandleTaskCompletion(task, null));
+
+ Assert.IsType(exception.InnerException);
+ Assert.Equal("Test exception", exception.InnerException.Message);
+ }
+}