From 33be58f13e45fc90c16842920ce8648d465f482c Mon Sep 17 00:00:00 2001 From: Maxwell Weru Date: Tue, 12 Dec 2023 09:42:16 +0300 Subject: [PATCH] Migrate to resilience policies in Polly v8 with a new example (#575) --- .../RabbitMqTransport.cs | 32 ++++++----- .../EventConsumerRegistration.cs | 2 +- .../Configuration/EventRegistration.cs | 18 +++---- .../DependencyInjection/EventBusOptions.cs | 12 ++--- .../Internal/EventBusConcurrentDictionary.cs | 2 +- src/Tingle.EventBus/Internal/PollyHelper.cs | 38 ------------- .../Internal/ResiliencePipelineHelper.cs | 53 +++++++++++++++++++ src/Tingle.EventBus/Tingle.EventBus.csproj | 4 +- .../Transports/EventBusTransport.cs | 50 ++++++++--------- .../Transports/EventBusTransportOptions.cs | 12 ++--- 10 files changed, 122 insertions(+), 101 deletions(-) delete mode 100644 src/Tingle.EventBus/Internal/PollyHelper.cs create mode 100644 src/Tingle.EventBus/Internal/ResiliencePipelineHelper.cs diff --git a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs index d607be26..78215ef9 100644 --- a/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs +++ b/src/Tingle.EventBus.Transports.RabbitMQ/RabbitMqTransport.cs @@ -23,7 +23,7 @@ public class RabbitMqTransport : EventBusTransport, ID { private readonly SemaphoreSlim connectionLock = new(1, 1); private readonly EventBusConcurrentDictionary subscriptionChannelsCache = new(); - private RetryPolicy? retryPolicy; + private ResiliencePipeline? resiliencePipeline; private IConnection? connection; private bool disposed; @@ -98,7 +98,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) // publish message string? scheduledId = null; - GetRetryPolicy().Execute(() => + GetResiliencePipeline().Execute(() => { // setup properties var properties = channel.CreateBasicProperties(); @@ -173,7 +173,7 @@ protected override async Task StopCoreAsync(CancellationToken cancellationToken) serializedEvents.Add((@event, @event.ContentType, body)); } - GetRetryPolicy().Execute(() => + GetResiliencePipeline().Execute(() => { var batch = channel.CreateBasicPublishBatch(); foreach (var (@event, contentType, body) in serializedEvents) @@ -241,16 +241,22 @@ protected override Task CancelCoreAsync(IList ids, throw new NotSupportedException("RabbitMQ does not support canceling published messages."); } - private RetryPolicy GetRetryPolicy() + private ResiliencePipeline GetResiliencePipeline() { - return retryPolicy ??= Policy.Handle() - .Or() - .WaitAndRetry(retryCount: Options.RetryCount, - sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)), - onRetry: (ex, time) => - { - Logger.LogError(ex, "RabbitMQ Client could not connect after {Timeout:n1}s ", time.TotalSeconds); - }); + return resiliencePipeline ??= new ResiliencePipelineBuilder() + .AddRetry(new RetryStrategyOptions + { + ShouldHandle = new PredicateBuilder().Handle() + .Handle(), + DelayGenerator = args => new ValueTask(TimeSpan.FromSeconds(Math.Pow(2, args.AttemptNumber))), + MaxRetryAttempts = Options.RetryCount, + OnRetry = args => + { + Logger.LogError(args.Outcome.Exception, "RabbitMQ Client could not connect after {Timeout:n1}s", args.RetryDelay.TotalSeconds); + return new ValueTask(); + }, + }) + .Build(); } private async Task ConnectConsumersAsync(CancellationToken cancellationToken) @@ -397,7 +403,7 @@ private async Task TryConnectAsync(CancellationToken cancellationToken) return true; } - GetRetryPolicy().Execute(() => + GetResiliencePipeline().Execute(() => { connection = Options.ConnectionFactory!.CreateConnection(); }); diff --git a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs index 2f925d0e..68e13ec9 100644 --- a/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs +++ b/src/Tingle.EventBus/Configuration/EventConsumerRegistration.cs @@ -48,7 +48,7 @@ public EventConsumerRegistration(Type consumerType, bool deadletter) /// Defaults to . /// When this value is set, it overrides the default value set on the transport or the bus. ///
- /// When a retry policy is in force, only errors not handled by it will be subject to the value set here. + /// When a resilience pipeline is in force, only errors not handled by it will be subject to the value set here. /// public UnhandledConsumerErrorBehaviour? UnhandledErrorBehaviour { get; set; } diff --git a/src/Tingle.EventBus/Configuration/EventRegistration.cs b/src/Tingle.EventBus/Configuration/EventRegistration.cs index 2a2825db..8128c819 100644 --- a/src/Tingle.EventBus/Configuration/EventRegistration.cs +++ b/src/Tingle.EventBus/Configuration/EventRegistration.cs @@ -67,17 +67,17 @@ public EventRegistration(Type eventType) public TimeSpan? DuplicateDetectionDuration { get; set; } /// - /// The retry policy to apply specifically for this event. + /// The resiliency pipeline to apply specifically for this event. /// This is in addition to what may be provided by the SDKs for each transport. - /// When provided alongside policies on the transport and the bus, it is used as the inner most policy. + /// When provided alongside pipelines on the transport and the bus, it is used as the inner most pipeline. /// /// /// When a value is provided, the transport may extend the lock for the - /// message during consumption until the execution with retry policy completes successfully or not. + /// message during consumption until the execution with resiliency pipeline completes successfully or not. /// In such a case, ensure the execution timeout (sometimes called the visibility timeout - /// or lock duration) is set to accommodate the longest possible duration of the retry policy. + /// or lock duration) is set to accommodate the longest possible duration of the resiliency pipeline. /// - public AsyncPolicy? RetryPolicy { get; set; } + public ResiliencePipeline? ResiliencePipeline { get; set; } /// /// The list of consumers registered for this event. @@ -93,11 +93,11 @@ public EventRegistration(Type eventType) /// public IDictionary Metadata { get; set; } = new Dictionary(); - /// Whether the execution policies have been merged. - internal bool MergedExecutionPolicies { get; set; } = false; + /// Whether the execution pipelines have been merged. + internal bool MergedExecutionPipelines { get; set; } = false; - /// The final policy used in executions for the event and it's consumers. - internal IAsyncPolicy ExecutionPolicy { get; set; } = Policy.NoOpAsync(); + /// The final resilience pipeline used in executions for the event and it's consumers. + internal ResiliencePipeline ExecutionPipeline { get; set; } = ResiliencePipeline.Empty; #region Equality Overrides diff --git a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs index 6d648335..7a45dfd4 100644 --- a/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs +++ b/src/Tingle.EventBus/DependencyInjection/EventBusOptions.cs @@ -32,15 +32,15 @@ public class EventBusOptions public bool DefaultTransportWaitStarted { get; set; } = true; /// - /// Optional retry policy to apply to the bus. - /// When provided alongside policies on the transport and the event registration, it is used as the putter most policy. + /// Optional resilience pipeline to apply to the bus. + /// When provided alongside pipelines on the transport and the event registration, it is used as the putter most pipeline. /// Defaults to . /// /// - /// To specify a value on an event registration, use . - /// To specify a value on a transport, use for the specific transport. + /// To specify a value on an event registration, use . + /// To specify a value on a transport, use for the specific transport. /// - public AsyncPolicy? RetryPolicy { get; set; } + public ResiliencePipeline? ResiliencePipeline { get; set; } /// /// Optional default format to use for generated event identifiers when for events where it is not specified. @@ -68,7 +68,7 @@ public class EventBusOptions /// Optional default behaviour for errors encountered in a consumer but are not handled. /// To specify a value per consumer, use the option. /// To specify a value per transport, use the option on the specific transport. - /// When an is in force, only errors that are not handled by it will be subject to the value set here. + /// When an is in force, only errors that are not handled by it will be subject to the value set here. /// Defaults to . /// public UnhandledConsumerErrorBehaviour? DefaultUnhandledConsumerErrorBehaviour { get; set; } diff --git a/src/Tingle.EventBus/Internal/EventBusConcurrentDictionary.cs b/src/Tingle.EventBus/Internal/EventBusConcurrentDictionary.cs index d403d6c8..ff88213f 100644 --- a/src/Tingle.EventBus/Internal/EventBusConcurrentDictionary.cs +++ b/src/Tingle.EventBus/Internal/EventBusConcurrentDictionary.cs @@ -72,7 +72,7 @@ public async Task GetOrAddAsync(TKey key, Func p is not null).Select(p => p!).ToArray(); - - return policies.Length switch - { - 0 => Policy.NoOpAsync(), // if there are none, return No-Op, if - 1 => policies[0], // a single policy can just be used (no need to combine) - _ => Policy.WrapAsync(policies), // more than one needs to be combined - }; - } -} diff --git a/src/Tingle.EventBus/Internal/ResiliencePipelineHelper.cs b/src/Tingle.EventBus/Internal/ResiliencePipelineHelper.cs new file mode 100644 index 00000000..4e44e118 --- /dev/null +++ b/src/Tingle.EventBus/Internal/ResiliencePipelineHelper.cs @@ -0,0 +1,53 @@ +using Microsoft.Extensions.DependencyInjection; +using Polly; +using Tingle.EventBus.Configuration; +using Tingle.EventBus.Transports; + +namespace Tingle.EventBus.Internal; + +internal static class ResiliencePipelineHelper +{ + public static void CombineIfNeeded(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration) + { + if (busOptions is null) throw new ArgumentNullException(nameof(busOptions)); + if (transportOptions is null) throw new ArgumentNullException(nameof(transportOptions)); + if (registration is null) throw new ArgumentNullException(nameof(registration)); + + // if the pipelines have been merged, there is no need to repeat the process + if (registration.MergedExecutionPipelines) return; + + registration.ExecutionPipeline = Combine(busOptions, transportOptions, registration); + registration.MergedExecutionPipelines = true; + } + + private static ResiliencePipeline Combine(EventBusOptions busOptions, EventBusTransportOptions transportOptions, EventRegistration registration) + { + var pipelines = new ResiliencePipeline?[] { + busOptions.ResiliencePipeline, // outer + transportOptions.ResiliencePipeline, + registration.ResiliencePipeline, // inner + }.Where(p => p is not null).Select(p => p!).ToArray(); + + return pipelines.Length switch + { + 0 => ResiliencePipeline.Empty, // if there are none, return empty + 1 => pipelines[0], // a single pipeline can just be used (no need to combine) + _ => new ResiliencePipelineBuilder().AddPipelines(pipelines).Build(), // more than one needs to be combined + }; + } + + private static ResiliencePipelineBuilder AddPipelines(this ResiliencePipelineBuilder builder, params ResiliencePipeline?[]? pipelines) + { + if (pipelines is null) return builder; + + foreach (var pipeline in pipelines) + { + if (pipeline is not null) + { + builder.AddPipeline(pipeline); + } + } + + return builder; + } +} diff --git a/src/Tingle.EventBus/Tingle.EventBus.csproj b/src/Tingle.EventBus/Tingle.EventBus.csproj index 0d73d35b..10217cfc 100644 --- a/src/Tingle.EventBus/Tingle.EventBus.csproj +++ b/src/Tingle.EventBus/Tingle.EventBus.csproj @@ -1,4 +1,4 @@ - + Event-Based framework for distributed applications. @@ -13,8 +13,8 @@ - + diff --git a/src/Tingle.EventBus/Transports/EventBusTransport.cs b/src/Tingle.EventBus/Transports/EventBusTransport.cs index b7f1dc2e..344a6888 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransport.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransport.cs @@ -78,14 +78,14 @@ public virtual void Initialize(EventBusTransportRegistration registration) public async Task StartAsync(CancellationToken cancellationToken) { /* - * Set the retry policy and unhandled error behaviour if not set. + * Set the resilience pipeline and unhandled error behaviour if not set. * Give priority to the transport default then the bus default. */ var registrations = GetRegistrations(); foreach (var reg in registrations) { - // Combine the retry policies - PollyHelper.CombineIfNeeded(BusOptions, Options, reg); + // Combine the resilience pipelines + ResiliencePipelineHelper.CombineIfNeeded(BusOptions, Options, reg); foreach (var ecr in reg.Consumers) { @@ -136,12 +136,12 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) where TEvent : class { - // publish, with resilience policies + // publish, with resilience pipelines await WaitStartedAsync(cancellationToken).ConfigureAwait(false); - PollyHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure policy is set for non-consumer events + ResiliencePipelineHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure pipeline is set for non-consumer events Logger.SendingEvent(eventBusId: @event.Id, transportName: Name, scheduled: scheduled); - return await registration.ExecutionPolicy.ExecuteAsync( - ct => PublishCoreAsync(@event, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); + return await registration.ExecutionPipeline.ExecuteAsync( + async ct => await PublishCoreAsync(@event, registration, scheduled, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } /// @@ -151,12 +151,12 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) CancellationToken cancellationToken = default) where TEvent : class { - // publish, with resilience policies + // publish, with resilience pipelines await WaitStartedAsync(cancellationToken).ConfigureAwait(false); - PollyHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure policy is set for non-consumer events + ResiliencePipelineHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure pipeline is set for non-consumer events Logger.SendingEvents(events, Name, scheduled); - return await registration.ExecutionPolicy.ExecuteAsync( - ct => PublishCoreAsync(events, registration, scheduled, ct), cancellationToken).ConfigureAwait(false); + return await registration.ExecutionPipeline.ExecuteAsync( + async ct => await PublishCoreAsync(events, registration, scheduled, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } /// Publish an event on the transport. @@ -197,24 +197,24 @@ private Task WaitStartedAsync(CancellationToken cancellationToken) public virtual async Task CancelAsync(string id, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { - // cancel, with resilience policies + // cancel, with resilience pipelines await WaitStartedAsync(cancellationToken).ConfigureAwait(false); - PollyHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure policy is set for non-consumer events + ResiliencePipelineHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure pipeline is set for non-consumer events Logger.CancelingEvent(id, Name); - await registration.ExecutionPolicy.ExecuteAsync( - ct => CancelCoreAsync(id, registration, ct), cancellationToken).ConfigureAwait(false); + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await CancelCoreAsync(id, registration, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } /// public virtual async Task CancelAsync(IList ids, EventRegistration registration, CancellationToken cancellationToken = default) where TEvent : class { - // cancel, with resilience policies + // cancel, with resilience pipelines await WaitStartedAsync(cancellationToken).ConfigureAwait(false); - PollyHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure policy is set for non-consumer events + ResiliencePipelineHelper.CombineIfNeeded(BusOptions, Options, registration); // ensure pipeline is set for non-consumer events Logger.CancelingEvents(ids, Name); - await registration.ExecutionPolicy.ExecuteAsync( - ct => CancelCoreAsync(ids, registration, ct), cancellationToken).ConfigureAwait(false); + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await CancelCoreAsync(ids, registration, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } /// Cancel a scheduled event on the transport. @@ -346,15 +346,15 @@ protected async Task SerializeAsync(IServiceScope scope, // Consume the event with the consumer appropriately if (consumer is IEventConsumer consumer_normal && @event is EventContext evt_normal) { - // Invoke handler method, with resilience policies - await registration.ExecutionPolicy.ExecuteAsync( - ct => consumer_normal.ConsumeAsync(evt_normal, ct), cancellationToken).ConfigureAwait(false); + // Invoke handler method, with resilience pipeline + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await consumer_normal.ConsumeAsync(evt_normal, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } else if (consumer is IDeadLetteredEventConsumer consumer_deadletter && @event is DeadLetteredEventContext evt_deadletter) { - // Invoke handler method, with resilience policies - await registration.ExecutionPolicy.ExecuteAsync( - ct => consumer_deadletter.ConsumeAsync(evt_deadletter, ct), cancellationToken).ConfigureAwait(false); + // Invoke handler method, with resilience pipelines + await registration.ExecutionPipeline.ExecuteAsync( + async ct => await consumer_deadletter.ConsumeAsync(evt_deadletter, ct).ConfigureAwait(false), cancellationToken).ConfigureAwait(false); } else { diff --git a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs index 7437df90..a9b7e158 100644 --- a/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs +++ b/src/Tingle.EventBus/Transports/EventBusTransportOptions.cs @@ -21,16 +21,16 @@ public abstract class EventBusTransportOptions public bool? WaitTransportStarted { get; set; } /// - /// Optional retry policy to apply specifically for this transport. + /// Optional resilience pipeline to apply specifically for this transport. /// This is in addition to what may be provided by the transport SDKs. - /// When provided alongside policies on the bus and the event registration, + /// When provided alongside pipelines on the bus and the event registration, /// it is configured inner to the one on the bus and outer to the one on the event registration. /// /// - /// To specify a value on an event registration, use . - /// To specify a value on the bus, use . + /// To specify a value on an event registration, use . + /// To specify a value on the bus, use . /// - public AsyncPolicy? RetryPolicy { get; set; } + public ResiliencePipeline? ResiliencePipeline { get; set; } /// /// The delay to introduce every time zero messages are received. @@ -88,7 +88,7 @@ public abstract class EventBusTransportOptions /// Optional default behaviour for errors encountered in a consumer but are not handled. /// This value overrides the default value set on the bus via . /// To specify a value per consumer, use the option. - /// When an is in force, only errors that are not handled by it will be subject to the value set here. + /// When an is in force, only errors that are not handled by it will be subject to the value set here. /// Defaults to . /// public UnhandledConsumerErrorBehaviour? DefaultUnhandledConsumerErrorBehaviour { get; set; }