From 7e13a22853fb413278df94805be90a2a851c5a7c Mon Sep 17 00:00:00 2001 From: Maxwell Weru Date: Thu, 6 Jun 2024 09:53:29 +0300 Subject: [PATCH] AOT compatibility This replaces `IsTrimmable` with `IsAotCompatible` which contains better analyzers. As a result, the deserialize and consume logic has been updated to handle specifics such as generics that cannot be inferred in AOT. Initial support for trimming in #564 did not have support for JSON serializer. Using the slim event bus meant you needed to create your own serialzier and register it as a default. With this PR, the `DefaultJsonEventSerializer` support the use of a `JsonSerializerContext` where each event has been declared as `EventEnvelope` A sample that supports AOT has been added to show how to work with it. This also helps during test and development. --- Directory.Build.props | 2 +- Tingle.EventBus.sln | 7 + samples/AotSupport/AotSupport.csproj | 12 ++ samples/AotSupport/Program.cs | 81 ++++++++++ .../AotSupport/Properties/launchSettings.json | 10 ++ .../AotSupport/appsettings.Development.json | 16 ++ samples/AotSupport/appsettings.json | 9 ++ src/Directory.Build.props | 2 +- ...EventBus.Serializers.NewtonsoftJson.csproj | 2 +- .../AmazonKinesisTransport.cs | 18 ++- .../ILoggerExtensions.cs | 6 +- .../AmazonSqsTransport.cs | 56 +++---- .../AzureEventHubsTransport.cs | 72 +++------ .../ILoggerExtensions.cs | 4 +- .../AzureQueueStorageTransport.cs | 57 +++---- .../AzureServiceBusTransport.cs | 59 +++---- .../ILoggerExtensions.cs | 4 +- .../ILoggerExtensions.cs | 6 +- .../InMemoryTestHarness.cs | 14 +- .../InMemoryTransport.cs | 57 +++---- .../KafkaTransport.cs | 55 +++---- .../RabbitMqTransport.cs | 58 +++---- .../EventConsumerRegistration.cs | 39 ++++- .../Configuration/EventRegistration.cs | 30 +++- .../MandatoryEventBusConfigurator.cs | 12 ++ .../DependencyInjection/EventBusBuilder.cs | 70 ++++++--- .../DependencyInjection/EventBusOptions.cs | 5 +- .../EventBusServiceCollectionExtensions.cs | 93 ++++++++--- src/Tingle.EventBus/EventBus.cs | 23 +-- src/Tingle.EventBus/EventContext.cs | 27 +++- .../Extensions/ILoggerExtensions.cs | 6 +- src/Tingle.EventBus/IEventConsumer.cs | 9 +- .../Internal/ExecutionHelper.cs | 72 +++++++++ src/Tingle.EventBus/MessageStrings.cs | 2 + .../Publisher/EventPublisher.cs | 23 +-- .../Publisher/IEventPublisher.cs | 23 +-- .../Publisher/IEventPublisherExtensions.cs | 76 ++++----- .../Publisher/WrappedEventPublisher.cs | 23 +-- .../Serialization/AbstractEventSerializer.cs | 8 +- .../DefaultJsonEventSerializer.cs | 72 +++++++-- .../Serialization/DeserializationContext.cs | 8 +- .../Serialization/IEventSerializer.cs | 19 +-- .../Serialization/SerializationContext.cs | 6 +- .../Transports/EventBusTransport.cs | 145 ++++++------------ .../Transports/IEventBusTransport.cs | 32 ++-- .../MandatoryEventBusConfiguratorTests.cs | 14 +- .../DefaultEventIdGeneratorTests.cs | 6 +- .../IotHubEventSerializerTests.cs | 16 +- 48 files changed, 885 insertions(+), 581 deletions(-) create mode 100644 samples/AotSupport/AotSupport.csproj create mode 100644 samples/AotSupport/Program.cs create mode 100644 samples/AotSupport/Properties/launchSettings.json create mode 100644 samples/AotSupport/appsettings.Development.json create mode 100644 samples/AotSupport/appsettings.json create mode 100644 src/Tingle.EventBus/Internal/ExecutionHelper.cs diff --git a/Directory.Build.props b/Directory.Build.props index 9eb1fb35..89400186 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -9,7 +9,7 @@ - $(WarningsAsErrors),IL2026 + $(WarningsAsErrors),IL2026,IL2060,IL2091,IL2095,IL3050 diff --git a/Tingle.EventBus.sln b/Tingle.EventBus.sln index 91f32443..09baadcc 100644 --- a/Tingle.EventBus.sln +++ b/Tingle.EventBus.sln @@ -54,6 +54,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{62F6 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AmazonSqsAndSns", "samples\AmazonSqsAndSns\AmazonSqsAndSns.csproj", "{C369A8E1-F29D-4705-BD38-28C3DE80D8DB}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AotSupport", "samples\AotSupport\AotSupport.csproj", "{63002328-9833-4FF3-9CE8-9134771E2455}" +EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureIotHub", "samples\AzureIotHub\AzureIotHub.csproj", "{3759B206-BF8D-4E46-9B04-1C19F156D295}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AzureManagedIdentity", "samples\AzureManagedIdentity\AzureManagedIdentity.csproj", "{A9AA8DC8-F463-4BB2-AD7B-59060C758862}" @@ -158,6 +160,10 @@ Global {C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Debug|Any CPU.Build.0 = Debug|Any CPU {C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Release|Any CPU.ActiveCfg = Release|Any CPU {C369A8E1-F29D-4705-BD38-28C3DE80D8DB}.Release|Any CPU.Build.0 = Release|Any CPU + {63002328-9833-4FF3-9CE8-9134771E2455}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {63002328-9833-4FF3-9CE8-9134771E2455}.Debug|Any CPU.Build.0 = Debug|Any CPU + {63002328-9833-4FF3-9CE8-9134771E2455}.Release|Any CPU.ActiveCfg = Release|Any CPU + {63002328-9833-4FF3-9CE8-9134771E2455}.Release|Any CPU.Build.0 = Release|Any CPU {3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {3759B206-BF8D-4E46-9B04-1C19F156D295}.Debug|Any CPU.Build.0 = Debug|Any CPU {3759B206-BF8D-4E46-9B04-1C19F156D295}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -233,6 +239,7 @@ Global {E4D62A60-39E4-401E-B146-0EA8DA272664} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B} {F827E66C-53DE-4366-A552-0320B3563294} = {BDD324B6-9EFC-49A3-9CF6-6CE494446C4B} {C369A8E1-F29D-4705-BD38-28C3DE80D8DB} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} + {63002328-9833-4FF3-9CE8-9134771E2455} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {3759B206-BF8D-4E46-9B04-1C19F156D295} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {A9AA8DC8-F463-4BB2-AD7B-59060C758862} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} {8E115759-87CC-4F45-9679-A9EBBD59992B} = {62F603F3-FF36-4E36-AC0C-08D1883525BE} diff --git a/samples/AotSupport/AotSupport.csproj b/samples/AotSupport/AotSupport.csproj new file mode 100644 index 00000000..3bccb91d --- /dev/null +++ b/samples/AotSupport/AotSupport.csproj @@ -0,0 +1,12 @@ + + + + true + true + + + + + + + diff --git a/samples/AotSupport/Program.cs b/samples/AotSupport/Program.cs new file mode 100644 index 00000000..1f1bca2a --- /dev/null +++ b/samples/AotSupport/Program.cs @@ -0,0 +1,81 @@ +using System.Text.Json.Serialization; +using Tingle.EventBus.Serialization; + +var host = Host.CreateDefaultBuilder(args) + .ConfigureServices((hostContext, services) => + { + services.AddSlimEventBus(CustomSrializerContext.Default, builder => + { + builder.AddConsumer(); + + builder.AddInMemoryTransport(); + }); + + services.AddHostedService(); + }) + .Build(); + +await host.RunAsync(); + +class ProducerService(IEventPublisher publisher) : BackgroundService +{ + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var delay = TimeSpan.FromSeconds(25); + var times = 5; + + var rnd = new Random(DateTimeOffset.UtcNow.Millisecond); + + for (var i = 0; i < times; i++) + { + var evt = new VideoUploaded + { + VideoId = Convert.ToUInt32(rnd.Next()).ToString(), + SizeBytes = Convert.ToUInt32(rnd.Next()), + }; + + evt.Url = $"https://localhost:8080/uploads/raw/{evt.VideoId}.flv"; + + await publisher.PublishAsync(evt, cancellationToken: stoppingToken); + + await Task.Delay(delay, stoppingToken); + } + } +} + +class VideoUploadedConsumer(ILogger logger) : IEventConsumer +{ + private static readonly TimeSpan SimulationDuration = TimeSpan.FromSeconds(3); + + public async Task ConsumeAsync(EventContext context, CancellationToken cancellationToken = default) + { + var evt = context.Event; + var videoId = evt.VideoId; + logger.LogInformation("Received event Id: {Id} for video '{VideoId}'.", context.Id, videoId); + + // Download video locally + logger.LogInformation("Downloading video from {VideoUrl} ({VideoSize} bytes).", evt.Url, evt.SizeBytes); + await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay + + // Extract thumbnail from video + logger.LogInformation("Extracting thumbnail from video with Id '{VideoId}'.", videoId); + await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay + + // Upload video thumbnail + var thumbnailUrl = $"https://localhost:8080/uploads/thumbnails/{videoId}.jpg"; + logger.LogInformation("Uploading thumbnail for video with Id '{VideoId}' to '{ThumbnailUrl}'.", videoId, thumbnailUrl); + await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay + + logger.LogInformation("Processing video with Id '{VideoId}' completed.", videoId); + } +} + +class VideoUploaded +{ + public string? VideoId { get; set; } + public string? Url { get; set; } + public long SizeBytes { get; set; } +} + +[JsonSerializable(typeof(EventEnvelope))] +partial class CustomSrializerContext : JsonSerializerContext { } diff --git a/samples/AotSupport/Properties/launchSettings.json b/samples/AotSupport/Properties/launchSettings.json new file mode 100644 index 00000000..52bdf77a --- /dev/null +++ b/samples/AotSupport/Properties/launchSettings.json @@ -0,0 +1,10 @@ +{ + "profiles": { + "AotSupport": { + "commandName": "Project", + "environmentVariables": { + "DOTNET_ENVIRONMENT": "Development" + } + } + } +} \ No newline at end of file diff --git a/samples/AotSupport/appsettings.Development.json b/samples/AotSupport/appsettings.Development.json new file mode 100644 index 00000000..80698e75 --- /dev/null +++ b/samples/AotSupport/appsettings.Development.json @@ -0,0 +1,16 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "Microsoft": "Information", + "System": "Information" + }, + "Console": { + "FormatterName": "simple", + "FormatterOptions": { + "SingleLine": true, + "TimestampFormat": "HH:mm:ss " + } + } + } +} diff --git a/samples/AotSupport/appsettings.json b/samples/AotSupport/appsettings.json new file mode 100644 index 00000000..8983e0fc --- /dev/null +++ b/samples/AotSupport/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/src/Directory.Build.props b/src/Directory.Build.props index dccdba1e..127ae639 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -9,7 +9,7 @@ Tingle Software Tingle Software EventBus - true + true diff --git a/src/Tingle.EventBus.Serializers.NewtonsoftJson/Tingle.EventBus.Serializers.NewtonsoftJson.csproj b/src/Tingle.EventBus.Serializers.NewtonsoftJson/Tingle.EventBus.Serializers.NewtonsoftJson.csproj index 6e78109c..815b9406 100644 --- a/src/Tingle.EventBus.Serializers.NewtonsoftJson/Tingle.EventBus.Serializers.NewtonsoftJson.csproj +++ b/src/Tingle.EventBus.Serializers.NewtonsoftJson/Tingle.EventBus.Serializers.NewtonsoftJson.csproj @@ -4,7 +4,7 @@ A serializer implementation for Tingle.EventBus using Newtonsoft.Json $(PackageTags);Serializers;Newtonsoft Tingle.EventBus.Serializers - false + false diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs index 8a520131..c21ae605 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/AmazonKinesisTransport.cs @@ -4,7 +4,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using System.Diagnostics.CodeAnalysis; using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; namespace Tingle.EventBus.Transports.Amazon.Kinesis; @@ -62,10 +64,10 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) @@ -98,10 +100,10 @@ protected override Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) diff --git a/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs index 4201186d..e8e7a143 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Kinesis/ILoggerExtensions.cs @@ -1,4 +1,6 @@ -using Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus; +using Tingle.EventBus.Internal; namespace Microsoft.Extensions.Logging; @@ -22,7 +24,7 @@ public static void SendingEventsToStream(this ILogger logger, IList eve eventBusIds: string.Join("\r\n- ", eventBusIds)); } - public static void SendingEventsToStream(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + public static void SendingEventsToStream<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) where T : class { if (!logger.IsEnabled(LogLevel.Information)) return; diff --git a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs index 8bd4f736..0b06373c 100644 --- a/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs +++ b/src/Tingle.EventBus.Transports.Amazon.Sqs/AmazonSqsTransport.cs @@ -90,10 +90,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { using var scope = CreateScope(); var body = await SerializeAsync(scope: scope, @@ -154,10 +154,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { using var scope = CreateScope(); var sequenceNumbers = new List(); @@ -350,10 +350,6 @@ private async Task CreateQueueIfNotExistsAsync(string queueName, EventRe private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, CancellationToken cancellationToken) { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - while (!cancellationToken.IsCancellationRequested) { try @@ -375,7 +371,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration using var scope = CreateScope(); // shared foreach (var message in messages) { - await ((Task)method.Invoke(this, [reg, ecr, queueUrl, message, cancellationToken])!).ConfigureAwait(false); + await OnMessageReceivedAsync(reg, ecr, queueUrl, message, cancellationToken).ConfigureAwait(false); } } } @@ -392,13 +388,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration } } - private async Task OnMessageReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - string queueUrl, - Message message, - CancellationToken cancellationToken) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, string queueUrl, Message message, CancellationToken cancellationToken) { var messageId = message.MessageId; message.TryGetAttribute(MetadataNames.CorrelationId, out var correlationId); @@ -418,8 +408,8 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, reg.EventName); activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); @@ -430,22 +420,18 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration var contentType = contentType_str == null ? null : new ContentType(contentType_str); using var scope = CreateScope(); - var context = await DeserializeAsync(scope: scope, - body: new BinaryData(message.Body), - contentType: contentType, - registration: reg, - identifier: messageId, - raw: message, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: new BinaryData(message.Body), + contentType: contentType, + registration: reg, + identifier: messageId, + raw: message, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueUrl: queueUrl); - var (successful, _) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs index 842787b5..954e0eb3 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/AzureEventHubsTransport.cs @@ -46,25 +46,10 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken var processor = await GetProcessorAsync(reg: reg, ecr: ecr, cancellationToken: cancellationToken).ConfigureAwait(false); // register handlers for error and processing - processor.PartitionClosingAsync += delegate (PartitionClosingEventArgs args) - { - return OnPartitionClosingAsync(processor, args); - }; - processor.PartitionInitializingAsync += delegate (PartitionInitializingEventArgs args) - { - return OnPartitionInitializingAsync(processor, args); - }; - processor.ProcessErrorAsync += delegate (ProcessErrorEventArgs args) - { - return OnProcessErrorAsync(processor, args); - }; - processor.ProcessEventAsync += delegate (ProcessEventArgs args) - { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnEventReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - return (Task)method.Invoke(this, [reg, ecr, processor, args])!; - }; + processor.PartitionClosingAsync += (PartitionClosingEventArgs args) => OnPartitionClosingAsync(processor, args); + processor.PartitionInitializingAsync += (PartitionInitializingEventArgs args) => OnPartitionInitializingAsync(processor, args); + processor.ProcessErrorAsync += (ProcessErrorEventArgs args) => OnProcessErrorAsync(processor, args); + processor.ProcessEventAsync += (ProcessEventArgs args) => OnEventReceivedAsync(reg, ecr, processor, args); // start processing await processor.StartProcessingAsync(cancellationToken: cancellationToken).ConfigureAwait(false); @@ -96,10 +81,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled event if (scheduled != null) @@ -148,10 +133,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled events if (scheduled != null) @@ -361,12 +346,7 @@ async Task creator(string key, CancellationToken ct) return processorsCache.GetOrAddAsync(key, creator, cancellationToken); } - private async Task OnEventReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - EventProcessorClient processor, - ProcessEventArgs args) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, EventProcessorClient processor, ProcessEventArgs args) { Logger.ProcessorReceivedEvent(eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, @@ -396,8 +376,8 @@ async Task creator(string key, CancellationToken ct) // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId?.ToString()); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, processor.EventHubName); @@ -409,14 +389,14 @@ async Task creator(string key, CancellationToken ct) sequenceNumber: data.SequenceNumber); using var scope = CreateScope(); var contentType = new ContentType(data.ContentType); - var context = await DeserializeAsync(scope: scope, - body: data.EventBody, - contentType: contentType, - registration: reg, - identifier: data.SequenceNumber.ToString(), - raw: data, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: data.EventBody, + contentType: contentType, + registration: reg, + identifier: data.SequenceNumber.ToString(), + raw: data, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedEvent(eventBusId: context.Id, eventHubName: processor.EventHubName, consumerGroup: processor.ConsumerGroup, @@ -429,11 +409,7 @@ async Task creator(string key, CancellationToken ct) .SetPartitionContext(args.Partition) .SetEventData(data); - var (successful, _) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs index 015a4ad4..a5ad2afd 100644 --- a/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs +++ b/src/Tingle.EventBus.Transports.Azure.EventHubs/ILoggerExtensions.cs @@ -1,6 +1,8 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; +using System.Diagnostics.CodeAnalysis; using Tingle.EventBus; +using Tingle.EventBus.Internal; namespace Microsoft.Extensions.Logging; @@ -52,7 +54,7 @@ public static void SendingEvents(this ILogger logger, IList eventBusIds eventBusIds: string.Join("\r\n- ", eventBusIds)); } - public static void SendingEvents(this ILogger logger, IList> events, string eventHubName, DateTimeOffset? scheduled = null) + public static void SendingEvents<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList> events, string eventHubName, DateTimeOffset? scheduled = null) where T : class { if (!logger.IsEnabled(LogLevel.Information)) return; diff --git a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs index 5fc00d5d..dc2710b6 100644 --- a/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.QueueStorage/AzureQueueStorageTransport.cs @@ -80,10 +80,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { using var scope = CreateScope(); var body = await SerializeAsync(scope: scope, @@ -113,10 +113,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when doing batch Logger.BatchingNotSupported(); @@ -250,10 +250,6 @@ async Task creator((Type, bool) _, CancellationToken ct) private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration ecr, CancellationToken cancellationToken) { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - var queueClient = await GetQueueClientAsync(reg: reg, deadletter: ecr.Deadletter, cancellationToken: cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested) @@ -276,7 +272,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration using var scope = CreateScope(); // shared foreach (var message in messages) { - await ((Task)method.Invoke(this, [reg, ecr, queueClient, message, scope, cancellationToken])!).ConfigureAwait(false); + await OnMessageReceivedAsync(reg, ecr, queueClient, message, scope, cancellationToken).ConfigureAwait(false); } } } @@ -293,14 +289,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration } } - private async Task OnMessageReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - QueueClient queueClient, - QueueMessage message, - IServiceScope scope, - CancellationToken cancellationToken) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, QueueClient queueClient, QueueMessage message, IServiceScope scope, CancellationToken cancellationToken) { var messageId = message.MessageId; using var log_scope = BeginLoggingScopeForConsume(id: messageId, @@ -312,21 +301,21 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer); // no way to get parentId at this point - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, queueClient.Name); activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); Logger.ProcessingMessage(messageId: messageId, queueName: queueClient.Name); - var context = await DeserializeAsync(scope: scope, - body: message.Body, - contentType: null, - registration: reg, - identifier: (AzureQueueStorageSchedulingId)message, - raw: message, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: message.Body, + contentType: null, + registration: reg, + identifier: (AzureQueueStorageSchedulingId)message, + raw: message, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(messageId: messageId, eventBusId: context.Id, queueName: queueClient.Name); @@ -336,11 +325,7 @@ private async Task ReceiveAsync(EventRegistration reg, EventConsumerRegistration activity?.SetParentId(parentId: parentActivityId); } - var (successful, _) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs index 0fdb6093..4697d02c 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/AzureServiceBusTransport.cs @@ -83,13 +83,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken // register handlers for error and processing processor.ProcessErrorAsync += OnMessageFaultedAsync; - processor.ProcessMessageAsync += delegate (ProcessMessageEventArgs args) - { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - return (Task)method.Invoke(this, [reg, ecr, processor, args])!; - }; + processor.ProcessMessageAsync += (ProcessMessageEventArgs args) => OnMessageReceivedAsync(reg, ecr, processor, args); // start processing Logger.StartingProcessing(entityPath: processor.EntityPath); @@ -122,10 +116,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { using var scope = CreateScope(); var body = await SerializeAsync(scope: scope, @@ -182,10 +176,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { using var scope = CreateScope(); var messages = new List(); @@ -507,12 +501,7 @@ private static TimeSpan SafeDuplicateDetectionHistoryTimeWindow(TimeSpan value) return TimeSpan.FromTicks(ticks); } - private async Task OnMessageReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - ServiceBusProcessor processor, - ProcessMessageEventArgs args) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, ServiceBusProcessor processor, ProcessMessageEventArgs args) { var entityPath = args.EntityPath; var message = args.Message; @@ -532,8 +521,8 @@ private static TimeSpan SafeDuplicateDetectionHistoryTimeWindow(TimeSpan value) // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId?.ToString()); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); var destination = await ShouldUseQueueAsync(reg, cancellationToken).ConfigureAwait(false) ? reg.EventName : ecr.ConsumerName; activity?.AddTag(ActivityTagNames.MessagingDestination, destination); // name of the queue/subscription @@ -542,30 +531,26 @@ private static TimeSpan SafeDuplicateDetectionHistoryTimeWindow(TimeSpan value) Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath); using var scope = CreateScope(); var contentType = new ContentType(message.ContentType); - var context = await DeserializeAsync(scope: scope, - body: message.Body, - contentType: contentType, - registration: reg, - identifier: message.SequenceNumber.ToString(), - raw: message, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: message.Body, + contentType: contentType, + registration: reg, + identifier: message.SequenceNumber.ToString(), + raw: message, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventBusId: context.Id, entityPath: entityPath); // set the extras context.SetServiceBusReceivedMessage(message); - if (ecr.Deadletter && context is DeadLetteredEventContext dlec) + if (ecr.Deadletter && context is IDeadLetteredEventContext dlec) { dlec.DeadLetterReason = message.DeadLetterReason; dlec.DeadLetterErrorDescription = message.DeadLetterErrorDescription; } - var (successful, ex) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour, processor.AutoCompleteMessages); diff --git a/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs index 80e299a7..2d15db09 100644 --- a/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs +++ b/src/Tingle.EventBus.Transports.Azure.ServiceBus/ILoggerExtensions.cs @@ -1,5 +1,7 @@ using Azure.Messaging.ServiceBus; +using System.Diagnostics.CodeAnalysis; using Tingle.EventBus; +using Tingle.EventBus.Internal; using Tingle.EventBus.Transports.Azure.ServiceBus; namespace Microsoft.Extensions.Logging; @@ -86,7 +88,7 @@ public static void SendingMessages(this ILogger logger, IList eventBusI eventBusIds: string.Join("\r\n- ", eventBusIds)); } - public static void SendingMessages(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + public static void SendingMessages<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) where T : class { if (!logger.IsEnabled(LogLevel.Information)) return; diff --git a/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs b/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs index ff7cc6bb..c497d1f5 100644 --- a/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs +++ b/src/Tingle.EventBus.Transports.InMemory/ILoggerExtensions.cs @@ -1,4 +1,6 @@ -using Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus; +using Tingle.EventBus.Internal; using Tingle.EventBus.Transports.InMemory.Client; namespace Microsoft.Extensions.Logging; @@ -44,7 +46,7 @@ public static void SendingMessages(this ILogger logger, IList eventBusI eventBusIds: string.Join("\r\n- ", eventBusIds)); } - public static void SendingMessages(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) + public static void SendingMessages<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList> events, string entityPath, DateTimeOffset? scheduled = null) where T : class { if (!logger.IsEnabled(LogLevel.Information)) return; diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs index 26f65482..6693c877 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTestHarness.cs @@ -1,5 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; namespace Tingle.EventBus.Transports.InMemory; @@ -46,7 +48,7 @@ public async Task> PublishedAsync(TimeSpan? delay = nu /// Gets all the published events of a given type. /// The type of event carried. /// - public IEnumerable> Published() where T : class => transport.Published.OfType>(); + public IEnumerable> Published<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>() where T : class => transport.Published.OfType>(); /// /// Gets all the published events of a given type. @@ -58,7 +60,7 @@ public async Task> PublishedAsync(TimeSpan? delay = nu /// /// /// - public async Task>> PublishedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) + public async Task>> PublishedAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); @@ -109,7 +111,7 @@ public async Task> ConsumedAsync(TimeSpan? delay = nul /// Gets all the consumed events of a given type. /// The type of event carried. /// - public IEnumerable> Consumed() where T : class => transport.Consumed.OfType>(); + public IEnumerable> Consumed<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>() where T : class => transport.Consumed.OfType>(); /// /// Get all the consumed events of a given type. @@ -121,7 +123,7 @@ public async Task> ConsumedAsync(TimeSpan? delay = nul /// /// /// - public async Task>> ConsumedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) + public async Task>> ConsumedAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); @@ -151,7 +153,7 @@ public async Task> FailedAsync(TimeSpan? delay = null, /// /// Gets all the failed events of a given type. /// - public IEnumerable> Failed() where T : class => transport.Failed.OfType>(); + public IEnumerable> Failed<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>() where T : class => transport.Failed.OfType>(); /// /// Gets all the failed events of a given type. @@ -163,7 +165,7 @@ public async Task> FailedAsync(TimeSpan? delay = null, /// /// /// - public async Task>> FailedAsync(TimeSpan? delay = null, CancellationToken cancellationToken = default) + public async Task>> FailedAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(TimeSpan? delay = null, CancellationToken cancellationToken = default) where T : class { await Task.Delay(delay ?? options.DefaultDelay, cancellationToken).ConfigureAwait(false); diff --git a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs index 1c4defb8..dfabff24 100644 --- a/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs +++ b/src/Tingle.EventBus.Transports.InMemory/InMemoryTransport.cs @@ -69,13 +69,7 @@ protected override async Task StartCoreAsync(CancellationToken cancellationToken // register handlers for error and processing processor.ProcessErrorAsync += OnMessageFaultedAsync; - processor.ProcessMessageAsync += delegate (ProcessMessageEventArgs args) - { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - return (Task)method.Invoke(this, [reg, ecr, processor, args])!; - }; + processor.ProcessMessageAsync += (ProcessMessageEventArgs args) => OnMessageReceivedAsync(reg, ecr, processor, args); // start processing Logger.StartingProcessing(entityPath: processor.EntityPath); @@ -108,10 +102,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) @@ -163,10 +157,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected async override Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected async override Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) @@ -315,12 +309,7 @@ Task creator(string _, CancellationToken ct) return processorsCache.GetOrAddAsync(key, creator, cancellationToken); } - private async Task OnMessageReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - InMemoryProcessor processor, - ProcessMessageEventArgs args) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, InMemoryProcessor processor, ProcessMessageEventArgs args) { var entityPath = processor.EntityPath; var message = args.Message; @@ -339,8 +328,8 @@ Task creator(string _, CancellationToken ct) // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId?.ToString()); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); var destination = reg.EntityKind == EntityKind.Queue ? reg.EventName : ecr.ConsumerName; activity?.AddTag(ActivityTagNames.MessagingDestination, destination); // name of the queue/subscription @@ -349,25 +338,21 @@ Task creator(string _, CancellationToken ct) Logger.ProcessingMessage(messageId: messageId, entityPath: entityPath); using var scope = CreateScope(); var contentType = message.ContentType is not null ? new ContentType(message.ContentType) : null; - var context = await DeserializeAsync(scope: scope, - body: message.Body, - contentType: contentType, - registration: reg, - identifier: message.SequenceNumber.ToString(), - raw: message, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: message.Body, + contentType: contentType, + registration: reg, + identifier: message.SequenceNumber.ToString(), + raw: message, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedMessage(sequenceNumber: message.SequenceNumber, eventBusId: context.Id, entityPath: entityPath); // set the extras context.SetInMemoryReceivedMessage(message); - var (successful, _) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // Add to Consumed/Failed list if (successful) consumed.Add(context); diff --git a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs index cbc0e3f0..c12e42f4 100644 --- a/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs +++ b/src/Tingle.EventBus.Transports.Kafka/KafkaTransport.cs @@ -113,10 +113,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected async override Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected async override Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when trying to publish scheduled message if (scheduled != null) @@ -158,10 +158,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected async override Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected async override Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { // log warning when doing batch Logger.BatchingNotSupported(); @@ -232,9 +232,6 @@ protected override Task CancelCoreAsync(IList ids, private async Task ProcessAsync(CancellationToken cancellationToken) { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnEventReceivedAsync), flags) ?? throw new InvalidOperationException("Method should not be null"); - while (!cancellationToken.IsCancellationRequested) { try @@ -254,8 +251,7 @@ private async Task ProcessAsync(CancellationToken cancellationToken) // form the generic method var ecr = reg.Consumers.Single(); // only one consumer per event - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - await ((Task)method.Invoke(this, [reg, ecr, result, cancellationToken])!).ConfigureAwait(false); + await OnEventReceivedAsync(reg, ecr, result, cancellationToken).ConfigureAwait(false); } catch (TaskCanceledException) { @@ -270,12 +266,7 @@ private async Task ProcessAsync(CancellationToken cancellationToken) } } - private async Task OnEventReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - ConsumeResult result, - CancellationToken cancellationToken) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnEventReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, ConsumeResult result, CancellationToken cancellationToken) { var message = result.Message; var messageKey = message.Key; @@ -298,8 +289,8 @@ private async Task ProcessAsync(CancellationToken cancellationToken) // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, result.Topic); @@ -309,24 +300,20 @@ private async Task ProcessAsync(CancellationToken cancellationToken) offset: result.Offset); using var scope = CreateScope(); var contentType = contentType_str == null ? null : new ContentType(contentType_str); - var context = await DeserializeAsync(scope: scope, - body: new BinaryData(message.Value), - contentType: contentType, - registration: reg, - identifier: result.Offset.ToString(), - raw: message, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: new BinaryData(message.Value), + contentType: contentType, + registration: reg, + identifier: result.Offset.ToString(), + raw: message, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.ReceivedEvent(eventBusId: context.Id, topic: result.Topic, partition: result.Partition, offset: result.Offset); - var (successful, _) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, _) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // dead-letter cannot be dead-lettered again, what else can we do? if (ecr.Deadletter) return; // TODO: figure out what to do when dead-letter fails diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs index 4d37283b..3fb973a4 100644 --- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs +++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs @@ -69,10 +69,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { if (!IsConnected) { @@ -141,10 +141,10 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) } /// - protected override async Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected override async Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) { if (!IsConnected) { @@ -272,25 +272,13 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken) var channel = await GetSubscriptionChannelAsync(exchangeName: exchangeName, queueName: queueName, cancellationToken).ConfigureAwait(false); var consumer = new AsyncEventingBasicConsumer(channel); - consumer.Received += delegate (object sender, BasicDeliverEventArgs @event) - { - var flags = System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic; - var mt = GetType().GetMethod(nameof(OnMessageReceivedAsync), flags) ?? throw new InvalidOperationException("Methods should be null"); - var method = mt.MakeGenericMethod(reg.EventType, ecr.ConsumerType); - return (Task)method.Invoke(this, [reg, ecr, channel, @event, CancellationToken.None])!; // do not chain CancellationToken - }; + consumer.Received += (object sender, BasicDeliverEventArgs @event) => OnMessageReceivedAsync(reg, ecr, channel, @event, CancellationToken.None); // do not chain CancellationToken channel.BasicConsume(queue: queueName, autoAck: false, consumer); } } } - private async Task OnMessageReceivedAsync(EventRegistration reg, - EventConsumerRegistration ecr, - IModel channel, - BasicDeliverEventArgs args, - CancellationToken cancellationToken) - where TEvent : class - where TConsumer : IEventConsumer + private async Task OnMessageReceivedAsync(EventRegistration reg, EventConsumerRegistration ecr, IModel channel, BasicDeliverEventArgs args, CancellationToken cancellationToken) { var messageId = args.BasicProperties?.MessageId; using var log_scope = BeginLoggingScopeForConsume(id: messageId, @@ -306,8 +294,8 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken) // Instrumentation using var activity = EventBusActivitySource.StartActivity(ActivityNames.Consume, ActivityKind.Consumer, parentActivityId?.ToString()); - activity?.AddTag(ActivityTagNames.EventBusEventType, typeof(TEvent).FullName); - activity?.AddTag(ActivityTagNames.EventBusConsumerType, typeof(TConsumer).FullName); + activity?.AddTag(ActivityTagNames.EventBusEventType, reg.EventType.FullName); + activity?.AddTag(ActivityTagNames.EventBusConsumerType, ecr.ConsumerType.FullName); activity?.AddTag(ActivityTagNames.MessagingSystem, Name); activity?.AddTag(ActivityTagNames.MessagingDestination, ecr.ConsumerName); activity?.AddTag(ActivityTagNames.MessagingDestinationKind, "queue"); // only queues are possible @@ -315,22 +303,18 @@ private async Task ConnectConsumersAsync(CancellationToken cancellationToken) Logger.LogDebug("Processing '{MessageId}'", messageId); using var scope = CreateScope(); var contentType = GetContentType(args.BasicProperties); - var context = await DeserializeAsync(scope: scope, - body: new BinaryData(args.Body), - contentType: contentType, - registration: reg, - identifier: messageId, - raw: args, - deadletter: ecr.Deadletter, - cancellationToken: cancellationToken).ConfigureAwait(false); + var context = await DeserializeAsync(scope: scope, + body: new BinaryData(args.Body), + contentType: contentType, + registration: reg, + identifier: messageId, + raw: args, + deadletter: ecr.Deadletter, + cancellationToken: cancellationToken).ConfigureAwait(false); Logger.LogInformation("Received message: '{MessageId}' containing Event '{Id}'", messageId, context.Id); - var (successful, ex) = await ConsumeAsync(registration: reg, - ecr: ecr, - @event: context, - scope: scope, - cancellationToken: cancellationToken).ConfigureAwait(false); + var (successful, ex) = await ConsumeAsync(scope, reg, ecr, context, cancellationToken).ConfigureAwait(false); // Decide the action to execute then execute var action = DecideAction(successful, ecr.UnhandledErrorBehaviour); diff --git a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs index d018a4c9..e39d9d06 100644 --- a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs +++ b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs @@ -6,15 +6,13 @@ namespace Tingle.EventBus.Configuration; /// /// Represents a registration for a consumer of an event. /// -/// The type of consumer handling the event. -/// Whether the consumer should be connected to the dead-letter entity. -public class EventConsumerRegistration(Type consumerType, bool deadletter) : IEquatable +public class EventConsumerRegistration : IEquatable { /// /// The type of consumer handling the event. /// [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] - public Type ConsumerType { get; } = consumerType ?? throw new ArgumentNullException(nameof(consumerType)); + public Type ConsumerType { get; } /// /// Gets or sets a value indicating if the consumer should be connected to the dead-letter entity. @@ -22,7 +20,7 @@ public class EventConsumerRegistration(Type consumerType, bool deadletter) : IEq /// When set to , you must use /// to consume events. /// - public bool Deadletter { get; } = deadletter; + public bool Deadletter { get; } /// /// The name generated for the consumer. @@ -49,6 +47,8 @@ public class EventConsumerRegistration(Type consumerType, bool deadletter) : IEq /// public IDictionary Metadata { get; set; } = new Dictionary(); + internal ConsumeDelegate Consume { get; init; } = default!; + /// /// Sets to . /// @@ -72,6 +72,27 @@ public EventConsumerRegistration OnError(UnhandledConsumerErrorBehaviour? behavi /// The for further configuration. public EventConsumerRegistration OnErrorDiscard() => OnError(UnhandledConsumerErrorBehaviour.Discard); + private EventConsumerRegistration(Type consumerType) // this private to enforce use of the factory methods which cater to generics in AOT + { + ConsumerType = consumerType ?? throw new ArgumentNullException(nameof(consumerType)); + Deadletter = typeof(IDeadLetteredEventConsumer<>).IsAssignableFrom(consumerType); + } + + /// Create a new instance of for the specified types. + /// The type of event. + /// The type of consumer. + public static EventConsumerRegistration Create<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>() + where TEvent : class + where TConsumer : IEventConsumer + => new(typeof(TConsumer)) { Consume = ExecutionHelper.ConsumeAsync }; + + /// Create a new instance of for the specified types. + /// The type of event. + /// The type of consumer. + [RequiresDynamicCode(MessageStrings.GenericsDynamicCodeMessage)] + public static EventConsumerRegistration Create([DynamicallyAccessedMembers(TrimmingHelper.Event)] Type eventType, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] Type consumerType) + => new(consumerType) { Consume = ExecutionHelper.MakeDelegate(eventType, consumerType), }; + #region Equality Overrides /// @@ -99,3 +120,11 @@ public bool Equals(EventConsumerRegistration? other) #endregion } + +/// Delegate for consuming an event payload to an . +/// The to use. +/// The for the current event. +/// The for the current event. +/// The context containing the event. +/// +internal delegate Task ConsumeDelegate(IEventConsumer consumer, EventRegistration registration, EventConsumerRegistration ecr, EventContext context, CancellationToken cancellationToken = default); diff --git a/src/Tingle.EventBus/Configuration/EventRegistration.cs b/src/Tingle.EventBus/Configuration/EventRegistration.cs index ff28cea9..e8a92c4b 100644 --- a/src/Tingle.EventBus/Configuration/EventRegistration.cs +++ b/src/Tingle.EventBus/Configuration/EventRegistration.cs @@ -8,14 +8,13 @@ namespace Tingle.EventBus.Configuration; /// /// Represents a registration for an event. /// -/// The type of event handled. -public class EventRegistration(Type eventType) : IEquatable +public class EventRegistration : IEquatable { /// /// The type of event handled. /// [DynamicallyAccessedMembers(TrimmingHelper.Event)] - public Type EventType { get; } = eventType ?? throw new ArgumentNullException(nameof(eventType)); + public Type EventType { get; } /// /// The name generated for the event. @@ -91,6 +90,24 @@ public class EventRegistration(Type eventType) : IEquatable /// The final resilience pipeline used in executions for the event and it's consumers. internal ResiliencePipeline ExecutionPipeline { get; set; } = ResiliencePipeline.Empty; + internal DeserializerDelegate Deserializer { get; set; } = default!; + + private EventRegistration(Type eventType) // this private to enforce use of the factory methods which cater to generics in AOT + { + EventType = eventType ?? throw new ArgumentNullException(nameof(eventType)); + } + + /// Create a new instance of for the specified event type. + /// The type of event. + public static EventRegistration Create<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>() where T : class + => new(typeof(T)) { Deserializer = ExecutionHelper.DeserializeToContextAsync, }; + + /// Create a new instance of for the specified event type. + /// The type of event. + [RequiresDynamicCode(MessageStrings.GenericsDynamicCodeMessage)] + public static EventRegistration Create([DynamicallyAccessedMembers(TrimmingHelper.Event)] Type eventType) + => new(eventType) { Deserializer = ExecutionHelper.MakeDelegate(eventType), }; + #region Equality Overrides /// @@ -117,3 +134,10 @@ public bool Equals(EventRegistration? other) #endregion } + +/// Delegate for deserializing an event payload to an . +/// The to use. +/// The to use. +/// The to use. +/// +internal delegate Task DeserializerDelegate(IEventSerializer serializer, DeserializationContext context, IEventPublisher publisher, CancellationToken cancellationToken = default); diff --git a/src/Tingle.EventBus/Configuration/MandatoryEventBusConfigurator.cs b/src/Tingle.EventBus/Configuration/MandatoryEventBusConfigurator.cs index 217ae270..ce6e348d 100644 --- a/src/Tingle.EventBus/Configuration/MandatoryEventBusConfigurator.cs +++ b/src/Tingle.EventBus/Configuration/MandatoryEventBusConfigurator.cs @@ -80,6 +80,12 @@ internal static void ConfigureEventName(EventRegistration reg, EventBusNamingOpt name = options.ReplaceInvalidCharacters(name); } reg.EventName = name; + + // ensure the deserialize delegate is set + if (reg.Deserializer == null) + { + throw new InvalidOperationException($"The '{type.FullName}' does not have a deserialize delegate set yet it is required."); + } } } @@ -133,6 +139,12 @@ internal void ConfigureConsumerNames(EventRegistration reg, EventBusNamingOption } ecr.ConsumerName = name; } + + // ensure consume delegate is set + if (ecr.Consume == null) + { + throw new InvalidOperationException($"The '{ecr.ConsumerType.FullName}' does not have a consume delegate set yet it is required."); + } } } diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs index 0a7e6618..2d0957c2 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusBuilder.cs @@ -14,6 +14,8 @@ namespace Microsoft.Extensions.DependencyInjection; /// public class EventBusBuilder { + private const string ConsumersRequiresDynamicCode = "Registering consumers for all subscribed events requires dynamic code that might not be available at runtime. Register the consumer once for every event using AddConsumer(...)"; + /// /// Creates an instance of /// @@ -42,16 +44,6 @@ public EventBusBuilder(IServiceCollection services) /// public IServiceCollection Services { get; } - /// Register default services for the EventBus. - [RequiresDynamicCode(MessageStrings.RequiresDynamicCodeMessage)] - [RequiresUnreferencedCode(MessageStrings.RequiresUnreferencedCodeMessage)] - public EventBusBuilder RegisterDefaultServices() - { - Services.AddSingleton(); // can be multiple do not use TryAdd*(...) - UseDefaultSerializer(); - return this; - } - /// Configure options for the EventBus. /// /// @@ -177,12 +169,42 @@ public EventBusBuilder UseDefaultJsonSerializerTrimmable(System.Text.Json.Serial return UseDefaultSerializer(provider => ActivatorUtilities.CreateInstance(provider, [context])); } - /// - /// Subscribe to events that a consumer can listen to. - /// + /// Subscribe a consumer to events of a single type. + /// The type of event to listen to. + /// The type of consumer to handle the events. + /// + /// + public EventBusBuilder AddConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>( + Action configure) + where TEvent : class + where TConsumer : class, IEventConsumer + { + var eventType = typeof(TEvent); + if (eventType.IsAbstract) throw new InvalidOperationException($"Abstract event types are not allowed."); + + var consumerType = typeof(TConsumer); + if (consumerType.IsAbstract) throw new InvalidOperationException($"Abstract consumer types are not allowed."); + + // add the event types to the registrations + return Configure(options => + { + // get or create a simple EventRegistration + var reg = options.Registrations.GetOrAdd(eventType, t => EventRegistration.Create()); + + // create a simple ConsumerRegistration (HashSet removes duplicates) + var ecr = EventConsumerRegistration.Create(); + reg.Consumers.Add(ecr); + + // call the configuration function + configure?.Invoke(reg, ecr); + }); + } + + /// Subscribe to events that a consumer can listen to. /// The type of consumer to handle the events. /// /// + [RequiresDynamicCode(ConsumersRequiresDynamicCode)] public EventBusBuilder AddConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>( Action configure) where TConsumer : class, IEventConsumer { @@ -226,10 +248,10 @@ public EventBusBuilder UseDefaultJsonSerializerTrimmable(System.Text.Json.Serial foreach (var (et, deadletter) in eventTypes) { // get or create a simple EventRegistration - var reg = options.Registrations.GetOrAdd(et, t => new EventRegistration(t)); + var reg = options.Registrations.GetOrAdd(et, t => EventRegistration.Create(et)); // create a simple ConsumerRegistration (HashSet removes duplicates) - var ecr = new EventConsumerRegistration(consumerType, deadletter); + var ecr = EventConsumerRegistration.Create(et, consumerType); reg.Consumers.Add(ecr); // call the configuration function @@ -238,12 +260,24 @@ public EventBusBuilder UseDefaultJsonSerializerTrimmable(System.Text.Json.Serial }); } - /// - /// Subscribe to events that a consumer can listen to. - /// + /// Subscribe to events that a consumer can listen to. + /// The type of event to listen to. + /// The type of consumer to handle the events. + /// + /// + public EventBusBuilder AddConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>( + Action? configure = null) + where TEvent : class + where TConsumer : class, IEventConsumer + { + return AddConsumer((reg, ecr) => configure?.Invoke(ecr)); + } + + /// Subscribe to events that a consumer can listen to. /// The type of consumer to handle the events. /// /// + [RequiresDynamicCode(ConsumersRequiresDynamicCode)] public EventBusBuilder AddConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Consumer)] TConsumer>( Action? configure = null) where TConsumer : class, IEventConsumer { diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs index 7a45dfd4..63fe7e6d 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs @@ -4,6 +4,7 @@ using System.Diagnostics.CodeAnalysis; using Tingle.EventBus; using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; using Tingle.EventBus.Transports; namespace Microsoft.Extensions.DependencyInjection; @@ -183,13 +184,13 @@ public bool TryGetConsumerRegistrations([NotNullWhen(true)] o /// The event to configure for /// /// - public EventBusOptions ConfigureEvent(Action configure) + public EventBusOptions ConfigureEvent<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(Action configure) where TEvent : class { if (configure is null) throw new ArgumentNullException(nameof(configure)); // if there's already a registration for the event return it var eventType = typeof(TEvent); - var registration = Registrations.GetOrAdd(eventType, et => new EventRegistration(et)); + var registration = Registrations.GetOrAdd(eventType, et => EventRegistration.Create()); configure(registration); return this; diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusServiceCollectionExtensions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusServiceCollectionExtensions.cs index b531f495..96891bc7 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusServiceCollectionExtensions.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusServiceCollectionExtensions.cs @@ -1,5 +1,10 @@ -using System.Diagnostics.CodeAnalysis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Diagnostics.CodeAnalysis; +using System.Text.Json.Serialization; using Tingle.EventBus; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Serialization; namespace Microsoft.Extensions.DependencyInjection; @@ -8,9 +13,7 @@ namespace Microsoft.Extensions.DependencyInjection; /// public static class EventBusServiceCollectionExtensions { - /// - /// Add Event Bus services. - /// + /// Add Event Bus services. /// The instance to add services to. /// An to continue setting up the Event Bus. [RequiresDynamicCode(MessageStrings.RequiresDynamicCodeMessage)] @@ -19,15 +22,21 @@ public static EventBusBuilder AddEventBus(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); - return new EventBusBuilder(services).RegisterDefaultServices(); + var builder = new EventBusBuilder(services); + services.AddSingleton(); // can be multiple do not use TryAdd*(...) + builder.UseDefaultSerializer(provider => + { + var optionsAccessor = provider.GetRequiredService>(); + var loggerFactory = provider.GetRequiredService(); + return new DefaultJsonEventSerializer(optionsAccessor, loggerFactory); + }); + + return builder; } - /// - /// Add Event Bus services. - /// + /// Add Event Bus services. /// The instance to add services to. /// An optional action for setting up the bus. - /// [RequiresDynamicCode(MessageStrings.RequiresDynamicCodeMessage)] [RequiresUnreferencedCode(MessageStrings.RequiresUnreferencedCodeMessage)] public static IServiceCollection AddEventBus(this IServiceCollection services, Action? setupAction = null) @@ -35,17 +44,20 @@ public static IServiceCollection AddEventBus(this IServiceCollection services, A if (services == null) throw new ArgumentNullException(nameof(services)); var builder = services.AddEventBus(); - setupAction?.Invoke(builder); - return services; } - /// - /// Add Event Bus services. - /// + /// Add Event Bus services with minimal defaults. /// The instance to add services to. /// An to continue setting up the Event Bus. + /// + /// This does not include support for: + /// + /// binding from + /// JSON serialization and deserialization. + /// + /// public static EventBusBuilder AddSlimEventBus(this IServiceCollection services) { if (services == null) throw new ArgumentNullException(nameof(services)); @@ -53,20 +65,63 @@ public static EventBusBuilder AddSlimEventBus(this IServiceCollection services) return new EventBusBuilder(services); } - /// - /// Add Event Bus services. - /// + /// Add Event Bus services with minimal defaults and JSON source generation. + /// The instance to add services to. + /// + /// The to use for serialization. + /// Each event to should be registered using a . + /// + /// An to continue setting up the Event Bus. + /// This does not include support for binding from . + public static EventBusBuilder AddSlimEventBus(this IServiceCollection services, JsonSerializerContext serializerContext) + { + if (services == null) throw new ArgumentNullException(nameof(services)); + if (serializerContext == null) throw new ArgumentNullException(nameof(serializerContext)); + + var builder = services.AddSlimEventBus(); + builder.UseDefaultSerializer(provider => + { + var optionsAccessor = provider.GetRequiredService>(); + var loggerFactory = provider.GetRequiredService(); + return new DefaultJsonEventSerializer(serializerContext, optionsAccessor, loggerFactory); + }); + + return builder; + } + + /// Add Event Bus services with minimal defaults. /// The instance to add services to. /// An optional action for setting up the bus. - /// + /// + /// This does not include support for: + /// + /// binding from + /// JSON serialization and deserialization. + /// + /// public static IServiceCollection AddSlimEventBus(this IServiceCollection services, Action? setupAction = null) { if (services == null) throw new ArgumentNullException(nameof(services)); var builder = services.AddSlimEventBus(); - setupAction?.Invoke(builder); + return services; + } + + /// Add Event Bus services with minimal defaults and JSON source generation. + /// The instance to add services to. + /// + /// The to use for serialization. + /// Each event to should be registered using a . + /// + /// An optional action for setting up the bus. + /// This does not include support for binding from . + public static IServiceCollection AddSlimEventBus(this IServiceCollection services, JsonSerializerContext serializerContext, Action? setupAction = null) + { + if (services == null) throw new ArgumentNullException(nameof(services)); + var builder = services.AddSlimEventBus(serializerContext); + setupAction?.Invoke(builder); return services; } } diff --git a/src/Tingle.EventBus/EventBus.cs b/src/Tingle.EventBus/EventBus.cs index 5ffee412..03bcc28b 100644 --- a/src/Tingle.EventBus/EventBus.cs +++ b/src/Tingle.EventBus/EventBus.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; using Tingle.EventBus.Configuration; using Tingle.EventBus.Diagnostics; using Tingle.EventBus.Ids; @@ -40,9 +41,9 @@ public class EventBus(EventBusTransportProvider transportProvider, /// /// /// - public async Task PublishAsync(EventContext @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public async Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { if (scheduled != null && scheduled <= DateTimeOffset.UtcNow) @@ -90,9 +91,9 @@ public class EventBus(EventBusTransportProvider transportProvider, /// /// /// - public async Task?> PublishAsync(IList> events, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public async Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { if (scheduled != null && scheduled <= DateTimeOffset.UtcNow) @@ -140,7 +141,7 @@ public class EventBus(EventBusTransportProvider transportProvider, /// The scheduling identifier of the scheduled event. /// /// - public async Task CancelAsync(string id, CancellationToken cancellationToken = default) + public async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, CancellationToken cancellationToken = default) where TEvent : class { // Instrumentation @@ -164,7 +165,7 @@ public async Task CancelAsync(string id, CancellationToken cancellationT /// The scheduling identifiers of the scheduled events. /// /// - public async Task CancelAsync(IList ids, CancellationToken cancellationToken = default) + public async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, CancellationToken cancellationToken = default) where TEvent : class { // Instrumentation @@ -204,7 +205,7 @@ public async Task StopAsync(CancellationToken cancellationToken) await Task.WhenAll(tasks).ConfigureAwait(false); } - internal (EventRegistration registration, IEventBusTransport transport) GetTransportForEvent() + internal (EventRegistration registration, IEventBusTransport transport) GetTransportForEvent<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>() where TEvent : class { // get the transport var reg = GetOrCreateRegistration(); @@ -219,14 +220,14 @@ public async Task StopAsync(CancellationToken cancellationToken) internal IEventBusTransport GetTransportForEvent(string name) => transports[name]; - internal EventRegistration GetOrCreateRegistration() + internal EventRegistration GetOrCreateRegistration<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>() where TEvent : class { // if there's already a registration for the event return it return options.Registrations.GetOrAdd(typeof(TEvent), et => { // at this point, the registration does not exist; // create it and pass it through all the configurators. - var registration = new EventRegistration(et); + var registration = EventRegistration.Create(); foreach (var cfg in configurators.Reverse()) { cfg.Configure(registration, options); diff --git a/src/Tingle.EventBus/EventContext.cs b/src/Tingle.EventBus/EventContext.cs index b29a33f5..a34bc51e 100644 --- a/src/Tingle.EventBus/EventContext.cs +++ b/src/Tingle.EventBus/EventContext.cs @@ -1,5 +1,6 @@ using System.Diagnostics.CodeAnalysis; using System.Net.Mime; +using Tingle.EventBus.Internal; using Tingle.EventBus.Serialization; namespace Tingle.EventBus; @@ -181,7 +182,7 @@ public bool TryGetHeaderValue(string key, [NotNullWhen(true)] out T? value) w /// The context for a specific event. /// The type of event carried. -public class EventContext : EventContext where T : class +public class EventContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T> : EventContext where T : class { /// /// @@ -205,9 +206,23 @@ internal EventContext(IEventPublisher publisher, IEventEnvelope envelope, Des public T Event { get; set; } } +/// Contract for a context for a specific event that has been dead-lettered. +public interface IDeadLetteredEventContext +{ + /// + /// Gets the dead letter reason for the event. + /// + string? DeadLetterReason { get; set; } + + /// + /// Gets the dead letter error description for the event. + /// + string? DeadLetterErrorDescription { get; set; } +} + /// The context for a specific dead-lettered event. /// The type of event carried. -public class DeadLetteredEventContext : EventContext where T : class +public class DeadLetteredEventContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T> : EventContext, IDeadLetteredEventContext where T : class { internal DeadLetteredEventContext(IEventPublisher publisher, IEventEnvelope envelope, DeserializationContext deserializationContext) : base(publisher, envelope, deserializationContext.ContentType, deserializationContext.Identifier) @@ -220,13 +235,9 @@ internal DeadLetteredEventContext(IEventPublisher publisher, IEventEnvelope e /// public T Event { get; set; } - /// - /// Gets the dead letter reason for the event. - /// + /// public string? DeadLetterReason { get; set; } - /// - /// Gets the dead letter error description for the event. - /// + /// public string? DeadLetterErrorDescription { get; set; } } diff --git a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs index 5401990c..b37d84b7 100644 --- a/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs +++ b/src/Tingle.EventBus/Extensions/ILoggerExtensions.cs @@ -1,5 +1,7 @@ -using Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus; using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; namespace Microsoft.Extensions.Logging; @@ -74,7 +76,7 @@ public static void SendingEvents(this ILogger logger, IList eventBusIds } } - public static void SendingEvents(this ILogger logger, IList> events, string transportName, DateTimeOffset? scheduled = null) + public static void SendingEvents<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(this ILogger logger, IList> events, string transportName, DateTimeOffset? scheduled = null) where T : class { if (!logger.IsEnabled(LogLevel.Information)) return; diff --git a/src/Tingle.EventBus/IEventConsumer.cs b/src/Tingle.EventBus/IEventConsumer.cs index afb39c97..836b2463 100644 --- a/src/Tingle.EventBus/IEventConsumer.cs +++ b/src/Tingle.EventBus/IEventConsumer.cs @@ -1,4 +1,7 @@ -namespace Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; + +namespace Tingle.EventBus; /// /// Contract describing a consumer of one or more events. @@ -11,7 +14,7 @@ public interface IEventConsumer /// /// Contract describing a consumer of an event. /// -public interface IEventConsumer : IEventConsumer where T : class +public interface IEventConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T> : IEventConsumer where T : class { /// /// Consume an event of the provided type. @@ -25,7 +28,7 @@ public interface IEventConsumer : IEventConsumer where T : class /// /// Contract describing a consumer of a dead-lettered event. /// -public interface IDeadLetteredEventConsumer : IEventConsumer where T : class +public interface IDeadLetteredEventConsumer<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T> : IEventConsumer where T : class { /// /// Consume a dead-lettered event of the provided type. diff --git a/src/Tingle.EventBus/Internal/ExecutionHelper.cs b/src/Tingle.EventBus/Internal/ExecutionHelper.cs new file mode 100644 index 00000000..70eeac05 --- /dev/null +++ b/src/Tingle.EventBus/Internal/ExecutionHelper.cs @@ -0,0 +1,72 @@ +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; +using Tingle.EventBus.Serialization; + +namespace Tingle.EventBus; + +internal static class ExecutionHelper +{ + /// Deserialize an event from a stream of bytes. + /// The event type to be deserialized. + /// The to use. + /// The to use. + /// The to use. + /// + public static async Task DeserializeToContextAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(IEventSerializer serializer, + DeserializationContext context, + IEventPublisher publisher, + CancellationToken cancellationToken = default) where T : class + { + var envelope = await serializer.DeserializeAsync(context, cancellationToken).ConfigureAwait(false) + ?? throw new InvalidOperationException($"Deserialization from '{context.Registration.EventType.Name}' resulted in null which is not allowed."); // throwing helps track the error + + // Create the context + return context.Deadletter + ? new DeadLetteredEventContext(publisher: publisher, envelope: envelope, deserializationContext: context) + : new EventContext(publisher: publisher, envelope: envelope, deserializationContext: context); + } + + public static async Task ConsumeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IEventConsumer consumer, + EventRegistration registration, + EventConsumerRegistration ecr, + EventContext @event, + CancellationToken cancellationToken) where TEvent : class + { + // Consume the event with the consumer appropriately + if (consumer is IEventConsumer consumer_normal && @event is EventContext evt_normal) + { + // Invoke handler method, with resilience pipeline + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await consumer_normal.ConsumeAsync(evt_normal, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + else if (consumer is IDeadLetteredEventConsumer consumer_deadletter && @event is DeadLetteredEventContext evt_deadletter) + { + // Invoke handler method, with resilience pipelines + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await consumer_deadletter.ConsumeAsync(evt_deadletter, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); + } + else + { + throw new InvalidOperationException($"Consumer '{ecr.ConsumerType.FullName}' can't consume '{@event.GetType().FullName}' events. This shouldn't happen. Please file an issue."); + } + } + + [RequiresDynamicCode(MessageStrings.GenericsDynamicCodeMessage)] + public static DeserializerDelegate MakeDelegate([DynamicallyAccessedMembers(TrimmingHelper.Event)] Type eventType) + { + var flags = System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Public; + var mi = typeof(ExecutionHelper).GetMethod(nameof(DeserializeToContextAsync), flags) ?? throw new InvalidOperationException("Methods should not be null"); + var method = mi.MakeGenericMethod([eventType]); + return (serializer, context, publisher, cancellationToken) => (Task)method.Invoke(null, [serializer, context, publisher, cancellationToken])!; + } + + [RequiresDynamicCode(MessageStrings.GenericsDynamicCodeMessage)] + public static ConsumeDelegate MakeDelegate([DynamicallyAccessedMembers(TrimmingHelper.Event)] Type eventType, [DynamicallyAccessedMembers(TrimmingHelper.Consumer)] Type consumerType) + { + var flags = System.Reflection.BindingFlags.Static | System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Public; + var mi = typeof(ExecutionHelper).GetMethod(nameof(ConsumeAsync), flags) ?? throw new InvalidOperationException("Methods should not be null"); + var method = mi.MakeGenericMethod([eventType]); + return (consumer, registration, ecr, context, cancellationToken) => (Task)method.Invoke(null, [consumer, registration, ecr, context, cancellationToken])!; + } +} diff --git a/src/Tingle.EventBus/MessageStrings.cs b/src/Tingle.EventBus/MessageStrings.cs index 26cf099f..a4f4f2f8 100644 --- a/src/Tingle.EventBus/MessageStrings.cs +++ b/src/Tingle.EventBus/MessageStrings.cs @@ -12,4 +12,6 @@ internal class MessageStrings public const string RequiresUnreferencedCodeMessage = "JSON serialization, deserialization, and binding strongly typed objects to configuration values might require types that cannot be statically analyzed."; public const string RequiresDynamicCodeMessage = "JSON serialization, deserialization, and binding strongly typed objects to configuration values might require types that cannot be statically analyzed and might need runtime code generation. Use System.Text.Json source generation for native AOT applications."; + + public const string GenericsDynamicCodeMessage = "The native code for this instantiation might not be available at runtime"; } diff --git a/src/Tingle.EventBus/Publisher/EventPublisher.cs b/src/Tingle.EventBus/Publisher/EventPublisher.cs index 95f052c6..35f67eec 100644 --- a/src/Tingle.EventBus/Publisher/EventPublisher.cs +++ b/src/Tingle.EventBus/Publisher/EventPublisher.cs @@ -1,40 +1,43 @@ -namespace Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; + +namespace Tingle.EventBus; internal class EventPublisher(EventBus bus) : IEventPublisher { /// - public EventContext CreateEventContext(TEvent @event, string? correlationId = null) + public EventContext CreateEventContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(TEvent @event, string? correlationId = null) where TEvent : class { return new EventContext(this, @event); } /// - public async Task PublishAsync(EventContext @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public async Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { return await bus.PublishAsync(@event, scheduled, cancellationToken).ConfigureAwait(false); } /// - public async Task?> PublishAsync(IList> events, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public async Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { return await bus.PublishAsync(events: events, scheduled: scheduled, cancellationToken: cancellationToken).ConfigureAwait(false); } /// - public async Task CancelAsync(string id, CancellationToken cancellationToken = default) where TEvent : class + public async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, CancellationToken cancellationToken = default) where TEvent : class { await bus.CancelAsync(id: id, cancellationToken: cancellationToken).ConfigureAwait(false); } /// - public async Task CancelAsync(IList ids, CancellationToken cancellationToken = default) where TEvent : class + public async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, CancellationToken cancellationToken = default) where TEvent : class { await bus.CancelAsync(ids: ids, cancellationToken: cancellationToken).ConfigureAwait(false); } diff --git a/src/Tingle.EventBus/Publisher/IEventPublisher.cs b/src/Tingle.EventBus/Publisher/IEventPublisher.cs index f55e4494..178ba869 100644 --- a/src/Tingle.EventBus/Publisher/IEventPublisher.cs +++ b/src/Tingle.EventBus/Publisher/IEventPublisher.cs @@ -1,4 +1,7 @@ -namespace Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; + +namespace Tingle.EventBus; /// /// Contract describing a publisher of events. @@ -10,7 +13,7 @@ public interface IEventPublisher /// The event to be nested. /// The identifier of the event from which to create a child event. /// - EventContext CreateEventContext(TEvent @event, string? correlationId = null) + EventContext CreateEventContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(TEvent @event, string? correlationId = null) where TEvent : class; /// Publish an event. @@ -22,9 +25,9 @@ EventContext CreateEventContext(TEvent @event, string? correlati /// /// /// - Task PublishAsync(EventContext @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; /// Publish a batch of events. @@ -36,9 +39,9 @@ EventContext CreateEventContext(TEvent @event, string? correlati /// /// /// - Task?> PublishAsync(IList> events, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; /// Cancel a scheduled event. @@ -46,7 +49,7 @@ EventContext CreateEventContext(TEvent @event, string? correlati /// The scheduling identifier of the scheduled event. /// /// - Task CancelAsync(string id, CancellationToken cancellationToken = default) + Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, CancellationToken cancellationToken = default) where TEvent : class; /// @@ -56,6 +59,6 @@ Task CancelAsync(string id, CancellationToken cancellationToken = defaul /// The scheduling identifiers of the scheduled events. /// /// - Task CancelAsync(IList ids, CancellationToken cancellationToken = default) + Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, CancellationToken cancellationToken = default) where TEvent : class; } diff --git a/src/Tingle.EventBus/Publisher/IEventPublisherExtensions.cs b/src/Tingle.EventBus/Publisher/IEventPublisherExtensions.cs index 223dcaf9..00c1cc43 100644 --- a/src/Tingle.EventBus/Publisher/IEventPublisherExtensions.cs +++ b/src/Tingle.EventBus/Publisher/IEventPublisherExtensions.cs @@ -1,4 +1,6 @@ -using Tingle.EventBus.Retries; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; +using Tingle.EventBus.Retries; namespace Tingle.EventBus; @@ -19,10 +21,10 @@ public static class IEventPublisherExtensions /// /// /// - public static Task PublishAsync(this IEventPublisher publisher, - TEvent @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public static Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + TEvent @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { var context = publisher.CreateEventContext(@event); @@ -39,10 +41,10 @@ public static class IEventPublisherExtensions /// /// /// - public static Task?> PublishAsync(this IEventPublisher publisher, - IList events, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) where TEvent : class + public static Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + IList events, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { var contexts = events.Select(e => publisher.CreateEventContext(e)).ToList(); return publisher.PublishAsync(events: contexts, scheduled: scheduled, cancellationToken: cancellationToken); @@ -59,10 +61,10 @@ public static class IEventPublisherExtensions /// The duration of time to wait before the event is available on the bus for consumption. /// /// - public static Task PublishAsync(this IEventPublisher publisher, - EventContext @event, - TimeSpan? delay, - CancellationToken cancellationToken = default) + public static Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + EventContext @event, + TimeSpan? delay, + CancellationToken cancellationToken = default) where TEvent : class { var scheduled = delay is null ? (DateTimeOffset?)null : DateTimeOffset.UtcNow + delay.Value; @@ -76,10 +78,10 @@ public static class IEventPublisherExtensions /// The duration of time to wait before the event is available on the bus for consumption. /// /// - public static Task?> PublishAsync(this IEventPublisher publisher, - IList> events, - TimeSpan? delay, - CancellationToken cancellationToken = default) + public static Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + IList> events, + TimeSpan? delay, + CancellationToken cancellationToken = default) where TEvent : class { var scheduled = delay is null ? (DateTimeOffset?)null : DateTimeOffset.UtcNow + delay.Value; @@ -93,10 +95,10 @@ public static class IEventPublisherExtensions /// The duration of time to wait before the event is available on the bus for consumption. /// /// - public static Task PublishAsync(this IEventPublisher publisher, - TEvent @event, - TimeSpan? delay, - CancellationToken cancellationToken = default) + public static Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + TEvent @event, + TimeSpan? delay, + CancellationToken cancellationToken = default) where TEvent : class { var scheduled = delay is null ? (DateTimeOffset?)null : DateTimeOffset.UtcNow + delay.Value; @@ -110,10 +112,10 @@ public static class IEventPublisherExtensions /// The duration of time to wait before the event is available on the bus for consumption. /// /// - public static Task?> PublishAsync(this IEventPublisher publisher, - IList events, - TimeSpan? delay, - CancellationToken cancellationToken = default) + public static Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + IList events, + TimeSpan? delay, + CancellationToken cancellationToken = default) where TEvent : class { var scheduled = delay is null ? (DateTimeOffset?)null : DateTimeOffset.UtcNow + delay.Value; @@ -130,9 +132,9 @@ public static class IEventPublisherExtensions /// The scheduling result. /// /// - public static Task CancelAsync(this IEventPublisher publisher, - ScheduledResult result, - CancellationToken cancellationToken = default) + public static Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + ScheduledResult result, + CancellationToken cancellationToken = default) where TEvent : class { return publisher.CancelAsync(id: result.Id, cancellationToken: cancellationToken); @@ -144,9 +146,9 @@ public static Task CancelAsync(this IEventPublisher publisher, /// The scheduling results. /// /// - public static Task CancelAsync(this IEventPublisher publisher, - IList results, - CancellationToken cancellationToken = default) + public static Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + IList results, + CancellationToken cancellationToken = default) where TEvent : class { var ids = results.Select(r => r.Id).ToList(); @@ -163,9 +165,9 @@ public static Task CancelAsync(this IEventPublisher publisher, /// The event to publish. /// /// - public static Task ScheduleRetryAsync(this IEventPublisher publisher, - TEvent @event, - CancellationToken cancellationToken = default) + public static Task ScheduleRetryAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + TEvent @event, + CancellationToken cancellationToken = default) where TEvent : class, IRetryableEvent { if (@event.TryGetNextRetryDelay(out var delay)) @@ -184,9 +186,9 @@ public static Task CancelAsync(this IEventPublisher publisher, /// The event to publish. /// /// - public static Task ScheduleRetryAsync(this IEventPublisher publisher, - EventContext @event, - CancellationToken cancellationToken = default) + public static Task ScheduleRetryAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(this IEventPublisher publisher, + EventContext @event, + CancellationToken cancellationToken = default) where TEvent : class, IRetryableEvent { if (@event.Event.TryGetNextRetryDelay(out var delay)) diff --git a/src/Tingle.EventBus/Publisher/WrappedEventPublisher.cs b/src/Tingle.EventBus/Publisher/WrappedEventPublisher.cs index 729981c0..12922d98 100644 --- a/src/Tingle.EventBus/Publisher/WrappedEventPublisher.cs +++ b/src/Tingle.EventBus/Publisher/WrappedEventPublisher.cs @@ -1,4 +1,7 @@ -namespace Tingle.EventBus; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; + +namespace Tingle.EventBus; /// /// An implementation of @@ -10,37 +13,37 @@ public class WrappedEventPublisher(IEventPublisher inner) : IEventPublisher private readonly IEventPublisher inner = inner ?? throw new ArgumentNullException(nameof(inner)); /// - public Task CancelAsync(string id, CancellationToken cancellationToken = default) where TEvent : class + public Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, CancellationToken cancellationToken = default) where TEvent : class { return inner.CancelAsync(id, cancellationToken); } /// - public Task CancelAsync(IList ids, CancellationToken cancellationToken = default) where TEvent : class + public Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, CancellationToken cancellationToken = default) where TEvent : class { return inner.CancelAsync(ids, cancellationToken); } /// - public EventContext CreateEventContext(TEvent @event, string? correlationId = null) + public EventContext CreateEventContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(TEvent @event, string? correlationId = null) where TEvent : class { return inner.CreateEventContext(@event, correlationId); } /// - public Task PublishAsync(EventContext @event, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { return inner.PublishAsync(@event, scheduled, cancellationToken); } /// - public Task?> PublishAsync(IList> events, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { return inner.PublishAsync(@events, scheduled, cancellationToken); diff --git a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs index d4a3def5..bd5fd589 100644 --- a/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/AbstractEventSerializer.cs @@ -1,9 +1,11 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using System.Diagnostics.CodeAnalysis; using System.Net.Mime; using System.Text.RegularExpressions; using Tingle.EventBus.Diagnostics; +using Tingle.EventBus.Internal; namespace Tingle.EventBus.Serialization; @@ -45,7 +47,7 @@ protected AbstractEventSerializer(IOptionsMonitor protected ILogger Logger { get; } /// - public async Task?> DeserializeAsync(DeserializationContext context, CancellationToken cancellationToken = default) + public async Task?> DeserializeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(DeserializationContext context, CancellationToken cancellationToken = default) where T : class { // Assume first media type if none is specified @@ -78,9 +80,7 @@ protected AbstractEventSerializer(IOptionsMonitor } /// - public async Task SerializeAsync(SerializationContext context, - CancellationToken cancellationToken = default) - where T : class + public async Task SerializeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(SerializationContext context, CancellationToken cancellationToken = default) where T : class { // Assume first media type if none is specified context.Event.ContentType ??= new ContentType(SupportedMediaTypes[0]); diff --git a/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs b/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs index cf085c68..54d8cb1c 100644 --- a/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/DefaultJsonEventSerializer.cs @@ -3,19 +3,36 @@ using Microsoft.Extensions.Options; using System.Diagnostics.CodeAnalysis; using System.Text.Json; +using System.Text.Json.Serialization; namespace Tingle.EventBus.Serialization; /// /// The default implementation of for JSON using the System.Text.Json library. /// -/// The options for configuring the serializer. -/// -[RequiresDynamicCode(MessageStrings.JsonSerializationRequiresDynamicCodeMessage)] -[RequiresUnreferencedCode(MessageStrings.JsonSerializationUnreferencedCodeMessage)] -public class DefaultJsonEventSerializer(IOptionsMonitor optionsAccessor, ILoggerFactory loggerFactory) - : AbstractEventSerializer(optionsAccessor, loggerFactory) +public class DefaultJsonEventSerializer : AbstractEventSerializer { + private readonly JsonSerializerContext? serializerContext; + + /// + /// + /// The options for configuring the serializer. + /// + [RequiresDynamicCode(MessageStrings.JsonSerializationRequiresDynamicCodeMessage)] + [RequiresUnreferencedCode(MessageStrings.JsonSerializationUnreferencedCodeMessage)] + public DefaultJsonEventSerializer(IOptionsMonitor optionsAccessor, ILoggerFactory loggerFactory) : base(optionsAccessor, loggerFactory) { } + + /// + /// + /// + /// The options for configuring the serializer. + /// + public DefaultJsonEventSerializer(JsonSerializerContext serializerContext, IOptionsMonitor optionsAccessor, ILoggerFactory loggerFactory) + : base(optionsAccessor, loggerFactory) + { + this.serializerContext = serializerContext ?? throw new ArgumentNullException(nameof(serializerContext)); + } + /// protected override IList SupportedMediaTypes => JsonContentTypes; @@ -25,9 +42,23 @@ public class DefaultJsonEventSerializer(IOptionsMonitor>(utf8Json: stream, - options: serializerOptions, - cancellationToken: cancellationToken).ConfigureAwait(false); + if (serializerContext is null) + { +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + return await JsonSerializer.DeserializeAsync>(utf8Json: stream, + options: serializerOptions, + cancellationToken: cancellationToken).ConfigureAwait(false); +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code + } + else + { + return await JsonSerializer.DeserializeAsync(utf8Json: stream, + returnType: typeof(EventEnvelope), + context: serializerContext, + cancellationToken: cancellationToken).ConfigureAwait(false) as EventEnvelope; + } } /// @@ -36,9 +67,24 @@ protected override async Task SerializeEnvelopeAsync(Stream stream, CancellationToken cancellationToken = default) { var serializerOptions = OptionsAccessor.CurrentValue.SerializerOptions; - await JsonSerializer.SerializeAsync(utf8Json: stream, - value: envelope, - options: serializerOptions, - cancellationToken: cancellationToken).ConfigureAwait(false); + if (serializerContext is null) + { +#pragma warning disable IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code +#pragma warning disable IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. + await JsonSerializer.SerializeAsync(utf8Json: stream, + value: envelope, + options: serializerOptions, + cancellationToken: cancellationToken).ConfigureAwait(false); +#pragma warning restore IL3050 // Calling members annotated with 'RequiresDynamicCodeAttribute' may break functionality when AOT compiling. +#pragma warning restore IL2026 // Members annotated with 'RequiresUnreferencedCodeAttribute' require dynamic access otherwise can break functionality when trimming application code + } + else + { + await JsonSerializer.SerializeAsync(utf8Json: stream, + value: envelope, + inputType: typeof(EventEnvelope), + context: serializerContext, + cancellationToken: cancellationToken).ConfigureAwait(false); + } } } diff --git a/src/Tingle.EventBus/Serialization/DeserializationContext.cs b/src/Tingle.EventBus/Serialization/DeserializationContext.cs index 9e3c1baa..e85c7871 100644 --- a/src/Tingle.EventBus/Serialization/DeserializationContext.cs +++ b/src/Tingle.EventBus/Serialization/DeserializationContext.cs @@ -6,8 +6,9 @@ namespace Tingle.EventBus.Serialization; /// Context for performing deserialization. /// The containing the raw data. /// Registration for this event being deserialized. +/// Whether the event is from a dead-letter entity. /// Identifier given by the transport for the event to be deserialized. -public sealed class DeserializationContext(BinaryData body, EventRegistration registration, string? identifier = null) +public sealed class DeserializationContext(BinaryData body, EventRegistration registration, bool deadletter, string? identifier = null) { /// /// The containing the raw data. @@ -19,6 +20,11 @@ public sealed class DeserializationContext(BinaryData body, EventRegistration re /// public EventRegistration Registration { get; } = registration ?? throw new ArgumentNullException(nameof(registration)); + /// + /// Whether the event is from a dead-letter entity. + /// + public bool Deadletter { get; } = deadletter; + /// /// Identifier given by the transport for the event to be deserialized. /// diff --git a/src/Tingle.EventBus/Serialization/IEventSerializer.cs b/src/Tingle.EventBus/Serialization/IEventSerializer.cs index f86b6789..76ab6ab4 100644 --- a/src/Tingle.EventBus/Serialization/IEventSerializer.cs +++ b/src/Tingle.EventBus/Serialization/IEventSerializer.cs @@ -1,25 +1,22 @@ -namespace Tingle.EventBus.Serialization; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Internal; + +namespace Tingle.EventBus.Serialization; /// /// A message serializer is responsible for serializing and deserializing an event. /// public interface IEventSerializer { - /// - /// Serialize an event into a stream of bytes. - /// + /// Serialize an event into a stream of bytes. /// The event type to be serialized. /// The to use. /// - /// - Task SerializeAsync(SerializationContext context, CancellationToken cancellationToken = default) where T : class; + Task SerializeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(SerializationContext context, CancellationToken cancellationToken = default) where T : class; - /// - /// Deserialize an event from a stream of bytes. - /// + /// Deserialize an event from a stream of bytes. /// The event type to be deserialized. /// The to use. /// - /// - Task?> DeserializeAsync(DeserializationContext context, CancellationToken cancellationToken = default) where T : class; + Task?> DeserializeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(DeserializationContext context, CancellationToken cancellationToken = default) where T : class; } diff --git a/src/Tingle.EventBus/Serialization/SerializationContext.cs b/src/Tingle.EventBus/Serialization/SerializationContext.cs index b5f00569..379dc52e 100644 --- a/src/Tingle.EventBus/Serialization/SerializationContext.cs +++ b/src/Tingle.EventBus/Serialization/SerializationContext.cs @@ -1,11 +1,13 @@ -using Tingle.EventBus.Configuration; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; namespace Tingle.EventBus.Serialization; /// Context for performing serialization. /// The event to be serialized. /// Registration for this event being deserialized. -public sealed class SerializationContext(EventContext @event, EventRegistration registration) where T : class +public sealed class SerializationContext<[DynamicallyAccessedMembers(TrimmingHelper.Event)] T>(EventContext @event, EventRegistration registration) where T : class { /// /// The event to be serialized. diff --git a/src/Tingle.EventBus/Transports/EventBusTransport.cs b/src/Tingle.EventBus/Transports/EventBusTransport.cs index 7668c3d9..458d2b80 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransport.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransport.cs @@ -45,20 +45,16 @@ public EventBusTransport(IServiceScopeFactory scopeFactory, Logger = loggerFactory?.CreateLogger(categoryName) ?? throw new ArgumentNullException(nameof(loggerFactory)); } - /// - /// Options for configuring the bus. - /// + /// Options for configuring the bus. protected EventBusOptions BusOptions { get; } /// protected EventBusTransportRegistration Registration { get; private set; } = default!; - /// - /// Options for configuring the transport. - /// + /// Options for configuring the transport. protected TOptions Options { get; private set; } = default!; - /// + /// Logger for the transport. protected ILogger Logger { get; } /// @@ -130,10 +126,10 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) #region Publishing /// - public virtual async Task PublishAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public virtual async Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { // publish, with resilience pipelines @@ -145,10 +141,10 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) } /// - public virtual async Task?> PublishAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + public virtual async Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class { // publish, with resilience pipelines @@ -168,10 +164,10 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) /// Set for immediate availability. /// /// - protected abstract Task PublishCoreAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected abstract Task PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; /// Publish a batch of events on the transport. @@ -183,10 +179,10 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) /// Set for immediate availability. /// /// - protected abstract Task?> PublishCoreAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + protected abstract Task?> PublishCoreAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; #endregion @@ -194,7 +190,7 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) #region Canceling /// - public virtual async Task CancelAsync(string id, EventRegistration registration, CancellationToken cancellationToken = default) + public virtual async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { // cancel, with resilience pipelines @@ -206,7 +202,7 @@ await registration.ExecutionPipeline.ExecuteAsync( } /// - public virtual async Task CancelAsync(IList ids, EventRegistration registration, CancellationToken cancellationToken = default) + public virtual async Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { // cancel, with resilience pipelines @@ -241,10 +237,7 @@ protected abstract Task CancelCoreAsync(IList ids, #region Serialization - /// - /// Deserialize an event from a stream of bytes. - /// - /// The event type to be deserialized. + /// Deserialize an event from a stream of bytes. /// The scope in which to resolve required services. /// The containing the raw data. /// The type of content contained in the . @@ -254,49 +247,40 @@ protected abstract Task CancelCoreAsync(IList ids, /// Whether the event is from a dead-letter entity. /// /// - protected async Task DeserializeAsync(IServiceScope scope, - BinaryData body, - ContentType? contentType, - EventRegistration registration, - string? identifier, - object? raw, - bool deadletter, - CancellationToken cancellationToken = default) - where TEvent : class + protected async Task DeserializeAsync(IServiceScope scope, + BinaryData body, + ContentType? contentType, + EventRegistration registration, + string? identifier, + object? raw, + bool deadletter, + CancellationToken cancellationToken = default) { // Resolve the serializer var provider = scope.ServiceProvider; var serializer = (IEventSerializer)ActivatorUtilities.GetServiceOrCreateInstance(provider, registration.EventSerializerType!); - // Deserialize the content into an envelope - var ctx = new DeserializationContext(body, registration, identifier) + // Deserialize + var publisher = provider.GetRequiredService(); + var ctx = new DeserializationContext(body, registration, deadletter, identifier) { ContentType = contentType, RawTransportData = raw, }; - var envelope = await serializer.DeserializeAsync(ctx, cancellationToken).ConfigureAwait(false) - ?? throw new InvalidOperationException($"Deserialization from '{typeof(TEvent).Name}' resulted in null which is not allowed."); // throwing helps track the error - - // Create the context - var publisher = provider.GetRequiredService(); - return deadletter - ? new DeadLetteredEventContext(publisher: publisher, envelope: envelope, deserializationContext: ctx) - : new EventContext(publisher: publisher, envelope: envelope, deserializationContext: ctx); + return await registration.Deserializer(serializer, ctx, publisher, cancellationToken).ConfigureAwait(false); } - /// - /// Serialize an event into a stream of bytes. - /// + /// Serialize an event into a stream of bytes. /// The event type to be serialized. /// The scope in which to resolve required services. /// The context of the event to be serialized. /// The bus registration for this event. /// /// - protected async Task SerializeAsync(IServiceScope scope, - EventContext @event, - EventRegistration registration, - CancellationToken cancellationToken = default) + protected async Task SerializeAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IServiceScope scope, + EventContext @event, + EventRegistration registration, + CancellationToken cancellationToken = default) where TEvent : class { // Resolve the serializer @@ -314,47 +298,25 @@ protected async Task SerializeAsync(IServiceScope scope, #region Consuming - /// - /// Push an incoming event to the consumer responsible for it. - /// - /// The event type. - /// The type of consumer. + /// Push an incoming event to the consumer responsible for it. + /// The scope in which to resolve required services. /// The for the current event. /// The for the current event. /// The context containing the event. - /// The scope in which to resolve required services. - /// An representing the state of the action. /// - protected async Task ConsumeAsync(EventRegistration registration, - EventConsumerRegistration ecr, - EventContext @event, - IServiceScope scope, - CancellationToken cancellationToken) - where TConsumer : IEventConsumer - where TEvent : class + /// An representing the state of the action. + protected async Task ConsumeAsync(IServiceScope scope, + EventRegistration registration, + EventConsumerRegistration ecr, + EventContext @event, + CancellationToken cancellationToken) { try { // Resolve the consumer - var consumer = ActivatorUtilities.GetServiceOrCreateInstance(scope.ServiceProvider); + var consumer = (IEventConsumer)ActivatorUtilities.GetServiceOrCreateInstance(scope.ServiceProvider, ecr.ConsumerType); - // Consume the event with the consumer appropriately - if (consumer is IEventConsumer consumer_normal && @event is EventContext evt_normal) - { - // Invoke handler method, with resilience pipeline - await registration.ExecutionPipeline.ExecuteAsync( - async ct => await consumer_normal.ConsumeAsync(evt_normal, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); - } - else if (consumer is IDeadLetteredEventConsumer consumer_deadletter && @event is DeadLetteredEventContext evt_deadletter) - { - // Invoke handler method, with resilience pipelines - await registration.ExecutionPipeline.ExecuteAsync( - async ct => await consumer_deadletter.ConsumeAsync(evt_deadletter, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); - } - else - { - throw new InvalidOperationException($"Consumer '{typeof(TConsumer).FullName}' can't consume '{@event.GetType().FullName}' events. This shouldn't happen. Please file an issue."); - } + await ecr.Consume(consumer, registration, ecr, @event, cancellationToken).ConfigureAwait(false); return new EventConsumeResult(successful: true, exception: null); } @@ -380,19 +342,14 @@ await registration.ExecutionPipeline.ExecuteAsync( #region Registrations - /// - /// Gets the consumer registrations for this transport. - /// - /// + /// Gets the consumer registrations for this transport. protected ICollection GetRegistrations() => BusOptions.GetRegistrations(transportName: Name); #endregion #region Logging - /// - /// Begins a logical operation scope for logging. - /// + /// Begins a logical operation scope for logging. /// /// /// diff --git a/src/Tingle.EventBus/Transports/IEventBusTransport.cs b/src/Tingle.EventBus/Transports/IEventBusTransport.cs index e4c3080e..24fcd583 100644 --- a/src/Tingle.EventBus/Transports/IEventBusTransport.cs +++ b/src/Tingle.EventBus/Transports/IEventBusTransport.cs @@ -1,4 +1,6 @@ -using Tingle.EventBus.Configuration; +using System.Diagnostics.CodeAnalysis; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Internal; namespace Tingle.EventBus.Transports; @@ -30,10 +32,10 @@ public interface IEventBusTransport /// Set for immediate availability. /// /// - Task PublishAsync(EventContext @event, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + Task PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(EventContext @event, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; /// Publish a batch of events on the transport. @@ -45,10 +47,10 @@ public interface IEventBusTransport /// Set for immediate availability. /// /// - Task?> PublishAsync(IList> events, - EventRegistration registration, - DateTimeOffset? scheduled = null, - CancellationToken cancellationToken = default) + Task?> PublishAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList> events, + EventRegistration registration, + DateTimeOffset? scheduled = null, + CancellationToken cancellationToken = default) where TEvent : class; /// Cancel a scheduled event on the transport. @@ -56,9 +58,9 @@ public interface IEventBusTransport /// The scheduling identifier of the scheduled event. /// The registration for the event. /// - Task CancelAsync(string id, - EventRegistration registration, - CancellationToken cancellationToken = default) + Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(string id, + EventRegistration registration, + CancellationToken cancellationToken = default) where TEvent : class; /// Cancel a batch of scheduled events on the transport. @@ -66,9 +68,9 @@ Task CancelAsync(string id, /// The scheduling identifiers of the scheduled events. /// The registration for the events. /// - Task CancelAsync(IList ids, - EventRegistration registration, - CancellationToken cancellationToken = default) + Task CancelAsync<[DynamicallyAccessedMembers(TrimmingHelper.Event)] TEvent>(IList ids, + EventRegistration registration, + CancellationToken cancellationToken = default) where TEvent : class; /// diff --git a/tests/Tingle.EventBus.Tests/Configurator/MandatoryEventBusConfiguratorTests.cs b/tests/Tingle.EventBus.Tests/Configurator/MandatoryEventBusConfiguratorTests.cs index b517f65e..396d09ab 100644 --- a/tests/Tingle.EventBus.Tests/Configurator/MandatoryEventBusConfiguratorTests.cs +++ b/tests/Tingle.EventBus.Tests/Configurator/MandatoryEventBusConfiguratorTests.cs @@ -10,7 +10,7 @@ public class MandatoryEventBusConfiguratorTests public void ConfigureSerializer_UsesDefault() { // when not set, use default - var registration = new EventRegistration(typeof(TestEvent1)); + var registration = EventRegistration.Create(); Assert.Null(registration.EventSerializerType); MandatoryEventBusConfigurator.ConfigureSerializer(registration); Assert.Equal(typeof(IEventSerializer), registration.EventSerializerType); @@ -20,7 +20,7 @@ public void ConfigureSerializer_UsesDefault() public void ConfigureSerializer_RespectsAttribute() { // attribute is respected - var registration = new EventRegistration(typeof(TestEvent2)); + var registration = EventRegistration.Create(); Assert.Null(registration.EventSerializerType); MandatoryEventBusConfigurator.ConfigureSerializer(registration); Assert.Equal(typeof(FakeEventSerializer1), registration.EventSerializerType); @@ -30,7 +30,7 @@ public void ConfigureSerializer_RespectsAttribute() public void ConfigureSerializer_Throws_InvalidOperationException() { // attribute is respected - var registration = new EventRegistration(typeof(TestEvent3)); + var registration = EventRegistration.Create(); var ex = Assert.Throws(() => MandatoryEventBusConfigurator.ConfigureSerializer(registration)); Assert.Equal("The type 'Tingle.EventBus.Tests.Configurator.FakeEventSerializer2' is used" + " as a serializer but does not implement 'Tingle.EventBus.Serialization.IEventSerializer'", @@ -54,7 +54,7 @@ public void ConfigureEventName_Works(Type eventType, bool useFullTypeNames, stri options.Naming.Scope = scope; options.Naming.Convention = namingConvention; options.Naming.UseFullTypeNames = useFullTypeNames; - var registration = new EventRegistration(eventType); + var registration = EventRegistration.Create(eventType); MandatoryEventBusConfigurator.ConfigureEventName(registration, options.Naming); Assert.Equal(expected, registration.EventName); } @@ -132,8 +132,8 @@ public void SetConsumerName_Works(Type eventType, options.Naming.ConsumerNameSource = consumerNameSource; options.Naming.ConsumerNamePrefix = prefix; - var registration = new EventRegistration(eventType); - registration.Consumers.Add(new EventConsumerRegistration(consumerType, false)); + var registration = EventRegistration.Create(eventType); + registration.Consumers.Add(EventConsumerRegistration.Create(eventType, consumerType)); var creg = Assert.Single(registration.Consumers); MandatoryEventBusConfigurator.ConfigureEventName(registration, options.Naming); @@ -147,7 +147,7 @@ public void SetConsumerName_Works(Type eventType, [InlineData(typeof(TestEvent3), EntityKind.Broadcast)] public void ConfigureEntityKind_Works(Type eventType, EntityKind? expected) { - var registration = new EventRegistration(eventType); + var registration = EventRegistration.Create(eventType); MandatoryEventBusConfigurator.ConfigureEntityKind(registration); Assert.Equal(expected, registration.EntityKind); } diff --git a/tests/Tingle.EventBus.Tests/DefaultEventIdGeneratorTests.cs b/tests/Tingle.EventBus.Tests/DefaultEventIdGeneratorTests.cs index b7a35cca..67bc3c71 100644 --- a/tests/Tingle.EventBus.Tests/DefaultEventIdGeneratorTests.cs +++ b/tests/Tingle.EventBus.Tests/DefaultEventIdGeneratorTests.cs @@ -16,7 +16,8 @@ public class DefaultEventIdGeneratorTests public void GenerateEventId_Works(EventIdFormat format) { var generator = new DefaultEventIdGenerator(); - var reg = new EventRegistration(typeof(SampleEvent)) { IdFormat = format, }; + var reg = EventRegistration.Create(); + reg.IdFormat = format; var id = generator.Generate(reg); Assert.NotNull(id); } @@ -25,7 +26,8 @@ public void GenerateEventId_Works(EventIdFormat format) public void GenerateEventId_Throws_InvalidOperationExecption() { var generator = new DefaultEventIdGenerator(); - var reg = new EventRegistration(typeof(SampleEvent)) { IdFormat = null, }; + var reg = EventRegistration.Create(); + reg.IdFormat = null; var ex = Assert.Throws(() => generator.Generate(reg)); Assert.Equal($"'{nameof(EventIdFormat)}.{reg.IdFormat}' set on event '{reg.EventType.FullName}' is not supported.", ex.Message); } diff --git a/tests/Tingle.EventBus.Transports.Azure.EventHubs.Tests/IotHubEventSerializerTests.cs b/tests/Tingle.EventBus.Transports.Azure.EventHubs.Tests/IotHubEventSerializerTests.cs index 420e2b00..bd4853a3 100644 --- a/tests/Tingle.EventBus.Transports.Azure.EventHubs.Tests/IotHubEventSerializerTests.cs +++ b/tests/Tingle.EventBus.Transports.Azure.EventHubs.Tests/IotHubEventSerializerTests.cs @@ -29,7 +29,7 @@ private static (EventData, DeserializationContext) CreateData(EventRegistration foreach (var (key, value) in properties) ed.Properties[key] = value; - var ctx = new DeserializationContext(ed.EventBody, ereg) + var ctx = new DeserializationContext(ed.EventBody, ereg, false) { RawTransportData = ed, ContentType = new ContentType("application/json"), @@ -42,7 +42,7 @@ public async Task DeserializeAsync_Works_For_Telemetry() { await TestSerializerAsync(async (provider, _, serializer) => { - var ereg = new EventRegistration(typeof(MyIotHubEvent)); + var ereg = EventRegistration.Create(); var stream = TestSamples.GetIotHubTelemetry(); var (ed, ctx) = CreateData(ereg, await BinaryData.FromStreamAsync(stream), "Telemetry"); var envelope = await serializer.DeserializeAsync(ctx); @@ -60,7 +60,7 @@ public async Task DeserializeAsync_Works_For_TwinChangeEvents() { await TestSerializerAsync(async (provider, _, serializer) => { - var ereg = new EventRegistration(typeof(MyIotHubEvent)); + var ereg = EventRegistration.Create(); var stream = TestSamples.GetIotHubTwinChangeEvents(); var (ed, ctx) = CreateData(ereg, await BinaryData.FromStreamAsync(stream), "twinChangeEvents", new Dictionary { @@ -94,7 +94,7 @@ public async Task DeserializeAsync_Works_For_DeviceLifecycleEvents() { await TestSerializerAsync(async (provider, _, serializer) => { - var ereg = new EventRegistration(typeof(MyIotHubEvent)); + var ereg = EventRegistration.Create(); var stream = TestSamples.GetIotHubDeviceLifecycleEvents(); var (ed, ctx) = CreateData(ereg, await BinaryData.FromStreamAsync(stream), "deviceLifecycleEvents", new Dictionary { @@ -127,7 +127,7 @@ public async Task DeserializeAsync_Works_For_DeviceConnectionStateEvents() { await TestSerializerAsync(async (provider, _, serializer) => { - var ereg = new EventRegistration(typeof(MyIotHubEvent)); + var ereg = EventRegistration.Create(); var stream = TestSamples.GetIotHubDeviceConnectionStateEvents(); var (ed, ctx) = CreateData(ereg, await BinaryData.FromStreamAsync(stream), "deviceConnectionStateEvents", new Dictionary { @@ -157,8 +157,8 @@ public async Task DeserializeAsync_Throws_NotSupportedException_For_UnsupportedT { await TestSerializerAsync(async (provider, _, serializer) => { - var ereg = new EventRegistration(typeof(DummyEvent1)); - var ctx = new DeserializationContext(BinaryData.FromString(""), ereg); + var ereg = EventRegistration.Create(); + var ctx = new DeserializationContext(BinaryData.FromString(""), ereg, false); var ex = await Assert.ThrowsAsync(() => serializer.DeserializeAsync(ctx)); Assert.Equal("Only events that inherit from 'Tingle.EventBus.Transports.Azure.EventHubs.IotHub.IotHubEvent' are supported for deserialization.", ex.Message); }); @@ -169,7 +169,7 @@ public async Task SerializeAsync_Throws_NotSupportedException() { await TestSerializerAsync(async (provider, publisher, serializer) => { - var ereg = new EventRegistration(typeof(MyIotHubEvent)); + var ereg = EventRegistration.Create(); var context = new EventContext(publisher, new()); var ctx = new SerializationContext(context, ereg); var ex = await Assert.ThrowsAsync(() => serializer.SerializeAsync(ctx));