Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes + unit tests for streaming PubSub implementation #1415

Merged
merged 30 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
85dbcf0
Added null check - the proto suggests this shouldn't ever be null, bu…
WhitWaldo Nov 28, 2024
054f6d9
Removed the Task.WhenAll making the operation non-blocking
WhitWaldo Nov 28, 2024
2eb6b5d
Added unit test to validate that the subscription is no longer blocking
WhitWaldo Nov 28, 2024
8593d8a
Removed unused line from previous test, added another test
WhitWaldo Nov 28, 2024
6465ad1
Added another test
WhitWaldo Nov 28, 2024
c92e46a
More unit tests
WhitWaldo Nov 28, 2024
0ccda6b
Added more unit tests
WhitWaldo Nov 28, 2024
09bec43
Updated to make DaprPublishSubscribeClientBuilder configurable via a …
WhitWaldo Nov 28, 2024
7900b0c
Added missing copyright statements
WhitWaldo Nov 28, 2024
0ae4919
Merge branch 'master' into pubsub-fix
WhitWaldo Nov 28, 2024
560aa2a
Added missing package reference
WhitWaldo Nov 28, 2024
95b102d
Merge remote-tracking branch 'origin/pubsub-fix' into pubsub-fix
WhitWaldo Nov 28, 2024
f864511
Fixed bad reference (missed in merge)
WhitWaldo Nov 28, 2024
51782b0
Fixed failing unit test
WhitWaldo Nov 28, 2024
e0b7de2
Tweak to only pass along EventMessage payloads to developers as it's …
WhitWaldo Nov 30, 2024
3e1df92
Merge branch 'master' into pubsub-fix
WhitWaldo Nov 30, 2024
1fce6b5
Was missing assignment of the Data property in the TopicMessage. Shou…
WhitWaldo Nov 30, 2024
4e71535
Merge remote-tracking branch 'origin/pubsub-fix' into pubsub-fix
WhitWaldo Nov 30, 2024
b2ab6ea
Fix - return would be bad. Continue is the right move.
WhitWaldo Nov 30, 2024
d48a85e
Added a simple test
WhitWaldo Dec 3, 2024
eea8546
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 3, 2024
fd2332c
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 4, 2024
b11cf16
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 4, 2024
be5a8a1
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 5, 2024
738f8be
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 10, 2024
35b8aca
Fixed unit tests
WhitWaldo Dec 11, 2024
31724b2
Merged in tweaks from #1422
WhitWaldo Dec 11, 2024
a08b7e8
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 11, 2024
b2ccb78
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 11, 2024
8fb65dd
Merge branch 'master' into pubsub-fix
WhitWaldo Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Dapr.Messaging/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")]


57 changes: 45 additions & 12 deletions src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
/// </summary>
private bool isDisposed;

// Internal property for testing purposes
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
// Internal property for testing purposes
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;

/// <summary>
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
/// </summary>
Expand Down Expand Up @@ -115,20 +120,40 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default

var stream = await GetStreamAsync(cancellationToken);

//Retrieve the messages from the sidecar and write to the messages channel
var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
TaskScheduler.Default);

//Process the messages as they're written to either channel
var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken);
var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
cancellationToken,
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
}

try
{
await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
}
catch (OperationCanceledException)
/// <summary>
/// Exposed for testing purposes only.
/// </summary>
/// <param name="message">The test message to write.</param>
internal async Task WriteMessageToChannelAsync(TopicMessage message)
{
await topicMessagesChannel.Writer.WriteAsync(message);
}

//Exposed for testing purposes only
internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement)
{
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
}

//Exposed for testing purposes only
internal static void HandleTaskCompletion(Task task, object? state)
{
if (task.Exception != null)
{
// Will be cleaned up during DisposeAsync
throw task.Exception;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't kept up with the exception behavior of the latest .NET runtime; will this get handled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should wrap any exception in an AggregateException that can be caught by any outer caller since there aren't any try/catch blocks that'll transparently handle it internally.

}
}

Expand Down Expand Up @@ -251,13 +276,21 @@ await stream.RequestStream.WriteAsync(
//Each time a message is received from the stream, push it into the topic messages channel
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
{
//https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null
//Skip the initial response - we only want to pass along TopicMessage payloads to developers
if (response?.EventMessage is null)
{
continue;
}

var message =
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value),
Data = response.EventMessage.Data.ToByteArray()
};

try
Expand Down Expand Up @@ -308,6 +341,6 @@ public async ValueTask DisposeAsync()
/// </summary>
/// <param name="MessageId">The identifier of the message.</param>
/// <param name="Action">The action to take on the message in the acknowledgement request.</param>
private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
}

1 change: 1 addition & 0 deletions test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<PackageReference Include="Grpc.Net.Client" />
<PackageReference Include="protobuf-net.Grpc.AspNetCore" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Moq;

namespace Dapr.Messaging.Test.Extensions;

Expand All @@ -10,15 +24,49 @@
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
{
var services = new ServiceCollection();

services.AddDaprPubSubClient();

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
Assert.NotNull(daprClient);
}

[Fact]
public void AddDaprPubSubClient_CallsConfigureAction()
{
var services = new ServiceCollection();
var httpClientFactoryMock = new Mock<IHttpClientFactory>();
services.AddSingleton(httpClientFactoryMock.Object);

var configureCalled = false;

services.AddDaprPubSubClient(Configure);

var serviceProvider = services.BuildServiceProvider();
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();

Check failure on line 46 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 6.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_CallsConfigureAction: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 46 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 8.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_CallsConfigureAction: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 46 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 7.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_CallsConfigureAction: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 46 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 9.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_CallsConfigureAction: System.NullReferenceException : Object reference not set to an instance of an object.
Assert.NotNull(daprClient);
Assert.True(configureCalled);
return;

void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder)
{
configureCalled = true;
}
}

[Fact]
public void AddDaprPubSubClient_RegistersServicesCorrectly()
{
var services = new ServiceCollection();
var httpClientFactoryMock = new Mock<IHttpClientFactory>();
services.AddSingleton(httpClientFactoryMock.Object);
services.AddDaprPubSubClient();
var serviceProvider = services.BuildServiceProvider();

var httpClientFactory = serviceProvider.GetService<IHttpClientFactory>();
Assert.NotNull(httpClientFactory);

var daprPubSubClient = serviceProvider.GetService<DaprPublishSubscribeClient>();

Check failure on line 69 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 6.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_RegistersServicesCorrectly: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 69 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 8.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_RegistersServicesCorrectly: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 69 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 7.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_RegistersServicesCorrectly: System.NullReferenceException : Object reference not set to an instance of an object.

Check failure on line 69 in test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

View workflow job for this annotation

GitHub Actions / Test .NET 9.0

Dapr.Messaging.Test.Extensions.PublishSubscribeServiceCollectionExtensionsTests.AddDaprPubSubClient_RegistersServicesCorrectly: System.NullReferenceException : Object reference not set to an instance of an object.
Assert.NotNull(daprPubSubClient);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
using Dapr.Messaging.PublishSubscribe;
// ------------------------------------------------------------------------
// Copyright 2024 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Messaging.PublishSubscribe;

namespace Dapr.Messaging.Test.PublishSubscribe
{
Expand Down
Loading
Loading