From 85dbcf0a8311d1542e45e0f2eb5fc094fa8d5358 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 01:23:53 -0600 Subject: [PATCH 01/18] Added null check - the proto suggests this shouldn't ever be null, but there's an issue reporting as much, so this fixes that Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 886d57006..3e86ba988 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -251,6 +251,12 @@ await stream.RequestStream.WriteAsync( //Each time a message is received from the stream, push it into the topic messages channel await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) { + //https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null + if (response?.EventMessage is null || response.InitialResponse is null) + { + return; + } + var message = new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, response.EventMessage.SpecVersion, response.EventMessage.DataContentType, From 054f6d9a48b10fb6de7bfa9bcb431f4ab9322845 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 01:38:20 -0600 Subject: [PATCH 02/18] Removed the Task.WhenAll making the operation non-blocking Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiver.cs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 3e86ba988..c39a0ab5b 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -115,20 +115,24 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default var stream = await GetStreamAsync(cancellationToken); - //Retrieve the messages from the sidecar and write to the messages channel - var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken); + //Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking + _ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken) + .ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, + TaskScheduler.Default); //Process the messages as they're written to either channel - var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken); - var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken); + _ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion, + null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); + _ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null, + cancellationToken, + TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); + } - try - { - await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask); - } - catch (OperationCanceledException) + private static void HandleTaskCompletion(Task task, object? state) + { + if (task.Exception != null) { - // Will be cleaned up during DisposeAsync + throw task.Exception; } } From 2eb6b5d6f5ffb6c4e84d8b00b0e0e57bcbeba15c Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 04:42:02 -0600 Subject: [PATCH 03/18] Added unit test to validate that the subscription is no longer blocking Signed-off-by: Whit Waldo --- src/Dapr.Messaging/AssemblyInfo.cs | 18 +++++++ .../PublishSubscribeReceiverTests.cs | 47 +++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 src/Dapr.Messaging/AssemblyInfo.cs create mode 100644 test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs diff --git a/src/Dapr.Messaging/AssemblyInfo.cs b/src/Dapr.Messaging/AssemblyInfo.cs new file mode 100644 index 000000000..4e2e7a0a7 --- /dev/null +++ b/src/Dapr.Messaging/AssemblyInfo.cs @@ -0,0 +1,18 @@ +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")] + + diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs new file mode 100644 index 000000000..c67d44fd7 --- /dev/null +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -0,0 +1,47 @@ +using Dapr.Messaging.PublishSubscribe; +using Grpc.Core; +using Moq; +using P = Dapr.Client.Autogen.Grpc.v1; + +namespace Dapr.Messaging.Test.PublishSubscribe; + +public class PublishSubscribeReceiverTests +{ + [Fact] + public void SubscribeAsync_ShouldNotBlock() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) + }; + + //var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Drop)); + var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); + + //Mock the daprClient + var mockDaprClient = new Mock(); + + //Create a mock AsyncDuplexStreamingCall + var mockRequestStream = new Mock>(); + var mockResponseStream = new Mock>(); + var mockCall = + new AsyncDuplexStreamingCall( + mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), + () => new Status(), () => new Metadata(), () => { }); + + //Setup the mock to return the mock call + mockDaprClient.Setup(client => + client.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) + .Returns(mockCall); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); + var stopwatch = System.Diagnostics.Stopwatch.StartNew(); + var subscribeTask = receiver.SubscribeAsync(); + stopwatch.Stop(); + + Assert.True(stopwatch.ElapsedMilliseconds < 100, "SubscribeAsync should return immediately and not block"); + } +} From 8593d8ac50334aaf2725bb856c2ac52f797d2f4f Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 04:44:27 -0600 Subject: [PATCH 04/18] Removed unused line from previous test, added another test Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiverTests.cs | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index c67d44fd7..9fc7773a1 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -1,6 +1,7 @@ using Dapr.Messaging.PublishSubscribe; using Grpc.Core; using Moq; +using NuGet.Frameworks; using P = Dapr.Client.Autogen.Grpc.v1; namespace Dapr.Messaging.Test.PublishSubscribe; @@ -18,7 +19,6 @@ public void SubscribeAsync_ShouldNotBlock() MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) }; - //var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Drop)); var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); //Mock the daprClient @@ -44,4 +44,24 @@ public void SubscribeAsync_ShouldNotBlock() Assert.True(stopwatch.ElapsedMilliseconds < 100, "SubscribeAsync should return immediately and not block"); } + + [Fact] + public void Constructor_ShouldInitializeCorrectly() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) + }; + + var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success)); + + //Mock the daprClient + var mockDaprClient = new Mock(); + var receiver = + new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); + Assert.NotNull(receiver); + } } From 6465ad1ea5c7861d23d5596c90e168a37b146e66 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 05:17:41 -0600 Subject: [PATCH 05/18] Added another test Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiver.cs | 9 ++++ .../PublishSubscribeReceiverTests.cs | 44 ++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index c39a0ab5b..085acf306 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -128,6 +128,15 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default); } + /// + /// Exposed for testing purposes only. + /// + /// The test message to write. + internal async Task WriteMessageToChannelAsync(TopicMessage message) + { + await topicMessagesChannel.Writer.WriteAsync(message); + } + private static void HandleTaskCompletion(Task task, object? state) { if (task.Exception != null) diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index 9fc7773a1..e30c8af2a 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -1,7 +1,6 @@ using Dapr.Messaging.PublishSubscribe; using Grpc.Core; using Moq; -using NuGet.Frameworks; using P = Dapr.Client.Autogen.Grpc.v1; namespace Dapr.Messaging.Test.PublishSubscribe; @@ -64,4 +63,47 @@ public void Constructor_ShouldInitializeCorrectly() new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object); Assert.NotNull(receiver); } + + [Fact] + public async Task ProcessTopicChannelMessagesAsync_ShouldProcessMessages() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) + }; + + // Mock the message handler + var mockMessageHandler = new Mock(); + mockMessageHandler + .Setup(handler => handler(It.IsAny(), It.IsAny())) + .ReturnsAsync(TopicResponseAction.Success); + + //Mock the daprClient + var mockDaprClient = new Mock(); + // Create a mock AsyncDuplexStreamingCall + var mockRequestStream = new Mock>(); + var mockResponseStream = new Mock>(); + var mockCall = new AsyncDuplexStreamingCall( + mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { }); + + //Set up the mock to return the mock call + mockDaprClient.Setup(client => client.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) + .Returns(mockCall); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, mockMessageHandler.Object, mockDaprClient.Object); + + await receiver.SubscribeAsync(); + + //Write a message to the channel + var message = new TopicMessage("id", "source", "type", "specVersion", "dataContentType", topicName, pubSubName); + await receiver.WriteMessageToChannelAsync(message); + + //Allow some time for the message to be processed + await Task.Delay(100); + + mockMessageHandler.Verify(handler => handler(It.IsAny(), It.IsAny()), + Times.Once); + } } From c92e46a996a861bd402f4f2c1effc1d27e8a7b34 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 06:07:15 -0600 Subject: [PATCH 06/18] More unit tests Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiver.cs | 12 ++- .../PublishSubscribeReceiverTests.cs | 74 ++++++++++++++++++- 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 085acf306..ba8a1606a 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable /// private bool isDisposed; + // Internal property for testing purposes + internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion; + // Internal property for testing purposes + internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion; + /// /// Constructs a new instance of a instance. /// @@ -137,6 +142,11 @@ internal async Task WriteMessageToChannelAsync(TopicMessage message) await topicMessagesChannel.Writer.WriteAsync(message); } + internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement) + { + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); + } + private static void HandleTaskCompletion(Task task, object? state) { if (task.Exception != null) @@ -327,6 +337,6 @@ public async ValueTask DisposeAsync() /// /// The identifier of the message. /// The action to take on the message in the acknowledgement request. - private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action); + internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action); } diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index e30c8af2a..cfddee803 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -1,4 +1,6 @@ -using Dapr.Messaging.PublishSubscribe; +using System.Threading.Channels; +using Dapr.AppCallback.Autogen.Grpc.v1; +using Dapr.Messaging.PublishSubscribe; using Grpc.Core; using Moq; using P = Dapr.Client.Autogen.Grpc.v1; @@ -106,4 +108,74 @@ public async Task ProcessTopicChannelMessagesAsync_ShouldProcessMessages() mockMessageHandler.Verify(handler => handler(It.IsAny(), It.IsAny()), Times.Once); } + + [Fact] + public async Task SubscribeAsync_ShouldProcessAcknowledgements() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(30), TopicResponseAction.Success)) + { + MaximumQueuedMessages = 100 // Example value, adjust as needed + }; + + // Mock the message handler + var mockMessageHandler = new Mock(); + mockMessageHandler + .Setup(handler => handler(It.IsAny(), It.IsAny())) + .ReturnsAsync(TopicResponseAction.Success); + + // Mock the DaprClient + var mockDaprClient = new Mock(); + + // Create a mock AsyncDuplexStreamingCall + var mockRequestStream = new Mock>(); + var mockResponseStream = new Mock>(); + var mockCall = new AsyncDuplexStreamingCall( + mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { }); + + // Setup the mock to return the mock call + mockDaprClient.Setup(client => client.SubscribeTopicEventsAlpha1(null, null, It.IsAny())) + .Returns(mockCall); + + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, mockMessageHandler.Object, mockDaprClient.Object); + + await receiver.SubscribeAsync(); + + // Use reflection to access the private acknowledgementsChannel and write an acknowledgement + var acknowledgementsChannelField = typeof(PublishSubscribeReceiver).GetField("acknowledgementsChannel", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + if (acknowledgementsChannelField is null) + Assert.Fail(); + var acknowledgementsChannel = (Channel)acknowledgementsChannelField.GetValue(receiver)!; + + var acknowledgement = new PublishSubscribeReceiver.TopicAcknowledgement("id", TopicEventResponse.Types.TopicEventResponseStatus.Success); + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); + + // Allow some time for the acknowledgement to be processed + await Task.Delay(100); + + // Verify that the request stream's WriteAsync method was called twice (initial request + acknowledgement) + mockRequestStream.Verify(stream => stream.WriteAsync(It.IsAny(), It.IsAny()), Times.Exactly(2)); + } + + [Fact] + public async Task DisposeAsync_ShouldCompleteChannels() + { + const string pubSubName = "testPubSub"; + const string topicName = "testTopic"; + var options = + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success)) + { + MaximumQueuedMessages = 100, MaximumCleanupTimeout = TimeSpan.FromSeconds(1) + }; + + var messageHandler = new TopicMessageHandler((message, topic) => Task.FromResult(TopicResponseAction.Success)); + var daprClient = new Mock(); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient.Object); + + await receiver.DisposeAsync(); + + Assert.True(receiver.TopicMessagesChannelCompletion.IsCompleted); + Assert.True(receiver.AcknowledgementsChannelCompletion.IsCompleted); + } } From 0ccda6b8fcdf05bdfd37b6cdbe3c6feb65578cf6 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 06:11:13 -0600 Subject: [PATCH 07/18] Added more unit tests Signed-off-by: Whit Waldo --- ...bscribeServiceCollectionExtensionsTests.cs | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs new file mode 100644 index 000000000..65411f7cd --- /dev/null +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -0,0 +1,46 @@ +using Dapr.Messaging.PublishSubscribe; +using Dapr.Messaging.PublishSubscribe.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Moq; + +namespace Dapr.Messaging.Test.Extensions; + +public class PublishSubscribeServiceCollectionExtensionsTests +{ + [Fact] + public void AddDaprPubSubClient_RegistersServicesCorrectly() + { + var services = new ServiceCollection(); + var httpClientFactoryMock = new Mock(); + services.AddSingleton(httpClientFactoryMock.Object); + + services.AddDaprPubSubClient(); + + var serviceProvider = services.BuildServiceProvider(); + var daprClient = serviceProvider.GetService(); + Assert.NotNull(daprClient); + } + + [Fact] + public void AddDaprPubSubClient_CallsConfigureAction() + { + var services = new ServiceCollection(); + var httpClientFactoryMock = new Mock(); + services.AddSingleton(httpClientFactoryMock.Object); + + var configureCalled = false; + + services.AddDaprPubSubClient(Configure); + + var serviceProvider = services.BuildServiceProvider(); + var daprClient = serviceProvider.GetService(); + Assert.NotNull(daprClient); + Assert.True(configureCalled); + return; + + void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder) + { + configureCalled = true; + } + } +} From 09bec43da4ec490e926440c7514179649fd188c8 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 06:11:39 -0600 Subject: [PATCH 08/18] Updated to make DaprPublishSubscribeClientBuilder configurable via a registered IConfiguration Signed-off-by: Whit Waldo --- .../PublishSubscribeServiceCollectionExtensions.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs index bc60c5880..64965c948 100644 --- a/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs +++ b/src/Dapr.Messaging/PublishSubscribe/Extensions/PublishSubscribeServiceCollectionExtensions.cs @@ -1,4 +1,5 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; namespace Dapr.Messaging.PublishSubscribe.Extensions; @@ -24,8 +25,9 @@ public static IServiceCollection AddDaprPubSubClient(this IServiceCollection ser services.TryAddSingleton(serviceProvider => { var httpClientFactory = serviceProvider.GetRequiredService(); + var configuration = serviceProvider.GetService(); - var builder = new DaprPublishSubscribeClientBuilder(); + var builder = new DaprPublishSubscribeClientBuilder(configuration); builder.UseHttpClientFactory(httpClientFactory); configure?.Invoke(serviceProvider, builder); From 7900b0c6f2d1bc4018cc5cc01d49fece672c8349 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 06:18:21 -0600 Subject: [PATCH 09/18] Added missing copyright statements Signed-off-by: Whit Waldo --- ...shSubscribeServiceCollectionExtensionsTests.cs | 15 ++++++++++++++- .../PublishSubscribe/MessageHandlingPolicyTest.cs | 15 ++++++++++++++- .../PublishSubscribeReceiverTests.cs | 15 ++++++++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index 65411f7cd..b978a011c 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -1,4 +1,17 @@ -using Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Messaging.PublishSubscribe; using Dapr.Messaging.PublishSubscribe.Extensions; using Microsoft.Extensions.DependencyInjection; using Moq; diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs index 0efb5e879..6efdd6397 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs @@ -1,4 +1,17 @@ -using Dapr.Messaging.PublishSubscribe; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Messaging.PublishSubscribe; namespace Dapr.Messaging.Test.PublishSubscribe { diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index cfddee803..06fa77e9a 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -1,4 +1,17 @@ -using System.Threading.Channels; +// ------------------------------------------------------------------------ +// Copyright 2024 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Threading.Channels; using Dapr.AppCallback.Autogen.Grpc.v1; using Dapr.Messaging.PublishSubscribe; using Grpc.Core; From 560aa2a56f2da2f30287ced84c4a39799d2ccbc4 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 07:35:36 -0600 Subject: [PATCH 10/18] Added missing package reference Signed-off-by: Whit Waldo --- test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj index 8f39e1713..0b10230f7 100644 --- a/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj +++ b/test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj @@ -25,6 +25,7 @@ + From f864511170396d3fe14fd092c4c816b5943d5bac Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 07:38:46 -0600 Subject: [PATCH 11/18] Fixed bad reference (missed in merge) Signed-off-by: Whit Waldo --- .../PublishSubscribeServiceCollectionExtensionsTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index a3fe9eba8..b1b8d4df2 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -60,6 +60,7 @@ public void AddDaprPubSubClient_RegistersServicesCorrectly() var services = new ServiceCollection(); var httpClientFactoryMock = new Mock(); services.AddSingleton(httpClientFactoryMock.Object); + var serviceProvider = services.BuildServiceProvider(); var httpClientFactory = serviceProvider.GetService(); Assert.NotNull(httpClientFactory); From 51782b0d38a573261ea8c6a92c423cc2f9a82f79 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Thu, 28 Nov 2024 07:42:51 -0600 Subject: [PATCH 12/18] Fixed failing unit test Signed-off-by: Whit Waldo --- .../PublishSubscribeServiceCollectionExtensionsTests.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index b1b8d4df2..5cbdb2b39 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -60,6 +60,7 @@ public void AddDaprPubSubClient_RegistersServicesCorrectly() var services = new ServiceCollection(); var httpClientFactoryMock = new Mock(); services.AddSingleton(httpClientFactoryMock.Object); + services.AddDaprPubSubClient(); var serviceProvider = services.BuildServiceProvider(); var httpClientFactory = serviceProvider.GetService(); From e0b7de2d7d9c392751f5965fe119686dcbccaaf4 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Fri, 29 Nov 2024 18:04:14 -0600 Subject: [PATCH 13/18] Tweak to only pass along EventMessage payloads to developers as it's expected that the initial response will be null if EventMessage is populated Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index ba8a1606a..0dbf9c505 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -275,7 +275,8 @@ await stream.RequestStream.WriteAsync( await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) { //https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null - if (response?.EventMessage is null || response.InitialResponse is null) + //Skip the initial response - we only want to pass along TopicMessage payloads to developers + if (response?.EventMessage is null) { return; } From 1fce6b5bfb2c88faff6b176f604c6bfaad59e60d Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Fri, 29 Nov 2024 23:33:36 -0600 Subject: [PATCH 14/18] Was missing assignment of the Data property in the TopicMessage. Shout out to both @tommorvolloriddle and @Aimless321 for catching this! Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 0dbf9c505..6e2dda42f 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -287,7 +287,8 @@ await stream.RequestStream.WriteAsync( response.EventMessage.Topic, response.EventMessage.PubsubName) { Path = response.EventMessage.Path, - Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value) + Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value), + Data = response.EventMessage.Data.ToByteArray() }; try From b2ab6ead00f69038131b845ad11d46f55bddaa34 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Sat, 30 Nov 2024 00:57:59 -0600 Subject: [PATCH 15/18] Fix - return would be bad. Continue is the right move. Signed-off-by: Whit Waldo --- src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 6e2dda42f..3364f63b9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -278,7 +278,7 @@ await stream.RequestStream.WriteAsync( //Skip the initial response - we only want to pass along TopicMessage payloads to developers if (response?.EventMessage is null) { - return; + continue; } var message = From d48a85edff804060fa70b87c9449581a77dd2c6d Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Tue, 3 Dec 2024 05:34:35 -0600 Subject: [PATCH 16/18] Added a simple test Signed-off-by: Whit Waldo --- .../PublishSubscribe/PublishSubscribeReceiver.cs | 4 +++- .../PublishSubscribeReceiverTests.cs | 12 ++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 3364f63b9..4b0d608ff 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -142,12 +142,14 @@ internal async Task WriteMessageToChannelAsync(TopicMessage message) await topicMessagesChannel.Writer.WriteAsync(message); } + //Exposed for testing purposes only internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement) { await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); } - private static void HandleTaskCompletion(Task task, object? state) + //Exposed for testing purposes only + internal static void HandleTaskCompletion(Task task, object? state) { if (task.Exception != null) { diff --git a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs index 06fa77e9a..f8070aa66 100644 --- a/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs +++ b/test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs @@ -191,4 +191,16 @@ public async Task DisposeAsync_ShouldCompleteChannels() Assert.True(receiver.TopicMessagesChannelCompletion.IsCompleted); Assert.True(receiver.AcknowledgementsChannelCompletion.IsCompleted); } + + [Fact] + public void HandleTaskCompletion_ShouldThrowException_WhenTaskHasException() + { + var task = Task.FromException(new InvalidOperationException("Test exception")); + + var exception = Assert.Throws(() => + PublishSubscribeReceiver.HandleTaskCompletion(task, null)); + + Assert.IsType(exception.InnerException); + Assert.Equal("Test exception", exception.InnerException.Message); + } } From 35b8aca459bbc859173340025c95bf9c80ebe077 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Tue, 10 Dec 2024 19:00:58 -0600 Subject: [PATCH 17/18] Fixed unit tests Signed-off-by: Whit Waldo --- .../PublishSubscribeServiceCollectionExtensionsTests.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index 5cbdb2b39..f37c23739 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -35,8 +35,6 @@ public void AddDaprPubSubClient_RegistersIHttpClientFactory() public void AddDaprPubSubClient_CallsConfigureAction() { var services = new ServiceCollection(); - var httpClientFactoryMock = new Mock(); - services.AddSingleton(httpClientFactoryMock.Object); var configureCalled = false; @@ -58,8 +56,6 @@ void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder) public void AddDaprPubSubClient_RegistersServicesCorrectly() { var services = new ServiceCollection(); - var httpClientFactoryMock = new Mock(); - services.AddSingleton(httpClientFactoryMock.Object); services.AddDaprPubSubClient(); var serviceProvider = services.BuildServiceProvider(); From 31724b25a3691316155250fd56c7133f9c4edeaf Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Tue, 10 Dec 2024 19:06:48 -0600 Subject: [PATCH 18/18] Merged in tweaks from #1422 Signed-off-by: Whit Waldo --- .../DaprPublishSubscribeGrpcClient.cs | 4 +-- ...bscribeServiceCollectionExtensionsTests.cs | 26 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs index 39024cb35..33ef05494 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -47,7 +47,7 @@ internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClien /// public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient, string? daprApiToken) { - Client = client; + this.Client = client; this.HttpClient = httpClient; this.DaprApiToken = daprApiToken; } @@ -63,7 +63,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient /// public override async Task SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default) { - var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, Client); + var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, this.Client); await receiver.SubscribeAsync(cancellationToken); return receiver; } diff --git a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs index f37c23739..d8e218d52 100644 --- a/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs +++ b/test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs @@ -13,6 +13,7 @@ using Dapr.Messaging.PublishSubscribe; using Dapr.Messaging.PublishSubscribe.Extensions; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Moq; @@ -20,6 +21,31 @@ namespace Dapr.Messaging.Test.Extensions; public sealed class PublishSubscribeServiceCollectionExtensionsTests { + [Fact] + public void AddDaprMessagingClient_FromIConfiguration() + { + const string apiToken = "abc123"; + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + {"DAPR_API_TOKEN", apiToken } + }) + .Build(); + + var services = new ServiceCollection(); + + services.AddSingleton(configuration); + + services.AddDaprPubSubClient(); + + var app = services.BuildServiceProvider(); + + var pubSubClient = app.GetRequiredService() as DaprPublishSubscribeGrpcClient; + + Assert.NotNull(pubSubClient!); + Assert.Equal(apiToken, pubSubClient.DaprApiToken); + } + [Fact] public void AddDaprPubSubClient_RegistersIHttpClientFactory() {