Skip to content

Commit

Permalink
Fix the behavior of OnHedging event (#1625)
Browse files Browse the repository at this point in the history
  • Loading branch information
martintmk authored Sep 26, 2023
1 parent 66fc2d7 commit 4759183
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 154 deletions.
3 changes: 2 additions & 1 deletion src/Polly.Core/Hedging/Controller/HedgingHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ namespace Polly.Hedging.Utils;

internal sealed record class HedgingHandler<T>(
Func<HedgingPredicateArguments<T>, ValueTask<bool>> ShouldHandle,
Func<HedgingActionGeneratorArguments<T>, Func<ValueTask<Outcome<T>>>?> ActionGenerator)
Func<HedgingActionGeneratorArguments<T>, Func<ValueTask<Outcome<T>>>?> ActionGenerator,
Func<OnHedgingArguments<T>, ValueTask>? OnHedging)
{
public Func<ValueTask<Outcome<T>>>? GenerateAction(HedgingActionGeneratorArguments<T> args)
{
Expand Down
17 changes: 17 additions & 0 deletions src/Polly.Core/Hedging/Controller/TaskExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public async ValueTask<bool> InitializeAsync<TState>(
return true;
}

await HandleOnHedgingAsync(snapshot.Context, attemptNumber - 1).ConfigureAwait(Context.ContinueOnCapturedContext);

ExecutionTaskSafe = ExecuteSecondaryActionAsync(action);
}
else
Expand All @@ -137,6 +139,21 @@ public async ValueTask<bool> InitializeAsync<TState>(
return true;
}

private async Task HandleOnHedgingAsync(ResilienceContext primaryContext, int attemptNumber)
{
var args = new OnHedgingArguments<T>(
primaryContext,
Context,
attemptNumber);

_telemetry.Report(new(ResilienceEventSeverity.Warning, HedgingConstants.OnHedgingEventName), Context, args);

if (_handler.OnHedging is { } onHedging)
{
await onHedging(args).ConfigureAwait(Context.ContinueOnCapturedContext);
}
}

private HedgingActionGeneratorArguments<TResult> CreateArguments<TResult, TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<TResult>>> primaryCallback,
ResilienceContext primaryContext,
Expand Down
19 changes: 11 additions & 8 deletions src/Polly.Core/Hedging/HedgingActionGeneratorArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ namespace Polly.Hedging;
/// </summary>
/// <typeparam name="TResult">The type of the result.</typeparam>
/// <remarks>
/// The <see cref="PrimaryContext"/> represents the context that was received by the hedging strategy and used to execute the primary action.
/// To prevent race conditions, the hedging strategy then clones the primary context into <see cref="ActionContext"/> and uses it to execute the hedged action.
/// Every hedged action gets its own context that is cloned from the primary.
/// <para>
/// Always use the constructor when creating this struct, otherwise we do not guarantee binary compatibility.
/// </para>
/// </remarks>
public readonly struct HedgingActionGeneratorArguments<TResult>
{
/// <summary>
/// Initializes a new instance of the <see cref="HedgingActionGeneratorArguments{TResult}"/> struct.
/// </summary>
/// <param name="primaryContext">The primary resilience context.</param>
/// <param name="actionContext">
/// The context that will be passed to action generated by <see cref="HedgingStrategyOptions{TResult}.ActionGenerator"/>.
/// .</param>
/// <param name="primaryContext">The primary context received by the hedging strategy.</param>
/// <param name="actionContext">The action context. cloned from the primary context.</param>
/// <param name="attemptNumber">The zero-based hedging attempt number.</param>
/// <param name="callback">The callback passed to hedging strategy.</param>
/// <param name="callback">The callback passed to the hedging strategy.</param>
public HedgingActionGeneratorArguments(
ResilienceContext primaryContext,
ResilienceContext actionContext,
Expand All @@ -33,12 +36,12 @@ public HedgingActionGeneratorArguments(
}

/// <summary>
/// Gets the primary resilience context.
/// Gets the primary resilience context as received by the hedging strategy.
/// </summary>
public ResilienceContext PrimaryContext { get; }

/// <summary>
/// Gets the context that will be passed to action generated by <see cref="HedgingStrategyOptions{TResult}.ActionGenerator"/>.
/// Gets the action context that will be used for the hedged action.
/// </summary>
/// <remarks>
/// This context is cloned from <see cref="PrimaryContext"/>.
Expand All @@ -51,7 +54,7 @@ public HedgingActionGeneratorArguments(
public int AttemptNumber { get; }

/// <summary>
/// Gets the callback passed to hedging strategy.
/// Gets the callback passed to the hedging strategy.
/// </summary>
public Func<ResilienceContext, ValueTask<Outcome<TResult>>> Callback { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ public static class HedgingResiliencePipelineBuilderExtensions

private static HedgingResilienceStrategy<TResult> CreateHedgingStrategy<TResult>(StrategyBuilderContext context, HedgingStrategyOptions<TResult> options)
{
var handler = new HedgingHandler<TResult>(options.ShouldHandle!, options.ActionGenerator);
var handler = new HedgingHandler<TResult>(options.ShouldHandle!, options.ActionGenerator, options.OnHedging);

return new HedgingResilienceStrategy<TResult>(
options.Delay,
options.MaxHedgedAttempts,
handler,
options.OnHedging,
options.DelayGenerator,
context.TimeProvider,
context.Telemetry);
Expand Down
34 changes: 1 addition & 33 deletions src/Polly.Core/Hedging/HedgingResilienceStrategy.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,26 @@
using System.Diagnostics.CodeAnalysis;
using Polly.Hedging.Controller;
using Polly.Hedging.Utils;
using Polly.Telemetry;

namespace Polly.Hedging;

internal sealed class HedgingResilienceStrategy<T> : ResilienceStrategy<T>
{
private readonly TimeProvider _timeProvider;
private readonly ResilienceStrategyTelemetry _telemetry;
private readonly HedgingController<T> _controller;

public HedgingResilienceStrategy(
TimeSpan hedgingDelay,
int maxHedgedAttempts,
HedgingHandler<T> hedgingHandler,
Func<OnHedgingArguments<T>, ValueTask>? onHedging,
Func<HedgingDelayGeneratorArguments, ValueTask<TimeSpan>>? hedgingDelayGenerator,
TimeProvider timeProvider,
ResilienceStrategyTelemetry telemetry)
{
HedgingDelay = hedgingDelay;
TotalAttempts = maxHedgedAttempts + 1; // include the initial attempt
DelayGenerator = hedgingDelayGenerator;
_timeProvider = timeProvider;
HedgingHandler = hedgingHandler;
OnHedging = onHedging;

_telemetry = telemetry;
_controller = new HedgingController<T>(telemetry, timeProvider, HedgingHandler, TotalAttempts);
}

Expand All @@ -38,8 +32,6 @@ public HedgingResilienceStrategy(

public HedgingHandler<T> HedgingHandler { get; }

public Func<OnHedgingArguments<T>, ValueTask>? OnHedging { get; }

[ExcludeFromCodeCoverage] // coverlet issue
protected internal override async ValueTask<Outcome<T>> ExecuteCore<TState>(
Func<ResilienceContext, TState, ValueTask<Outcome<T>>> callback,
Expand Down Expand Up @@ -71,11 +63,8 @@ private async ValueTask<Outcome<T>> ExecuteCoreAsync<TState>(
var cancellationToken = context.CancellationToken;
var continueOnCapturedContext = context.ContinueOnCapturedContext;

var attempt = -1;
while (true)
{
attempt++;
var start = _timeProvider.GetTimestamp();
if (cancellationToken.IsCancellationRequested)
{
return Outcome.FromException<T>(new OperationCanceledException(cancellationToken).TrySetStackTrace());
Expand All @@ -92,10 +81,6 @@ private async ValueTask<Outcome<T>> ExecuteCoreAsync<TState>(
var execution = await hedgingContext.TryWaitForCompletedExecutionAsync(delay).ConfigureAwait(continueOnCapturedContext);
if (execution is null)
{
// If completedHedgedTask is null it indicates that we still do not have any finished hedged task within the hedging delay.
// We will create additional hedged task in the next iteration.
await HandleOnHedgingAsync(
new OnHedgingArguments<T>(context, null, attempt, duration: delay)).ConfigureAwait(context.ContinueOnCapturedContext);
continue;
}

Expand All @@ -106,23 +91,6 @@ await HandleOnHedgingAsync(
execution.AcceptOutcome();
return outcome;
}

var executionTime = _timeProvider.GetElapsedTime(start);
await HandleOnHedgingAsync(
new OnHedgingArguments<T>(context, outcome, attempt, executionTime)).ConfigureAwait(context.ContinueOnCapturedContext);
}
}

private async ValueTask HandleOnHedgingAsync(OnHedgingArguments<T> args)
{
_telemetry.Report<OnHedgingArguments<T>, T>(new(ResilienceEventSeverity.Warning, HedgingConstants.OnHedgingEventName), args.Context, default, args);

if (OnHedging is not null)
{
// If nothing has been returned or thrown yet, the result is a transient failure,
// and other hedged request will be awaited.
// Before it, one needs to perform the task adjacent to each hedged call.
await OnHedging(args).ConfigureAwait(args.Context.ContinueOnCapturedContext);
}
}

Expand Down
34 changes: 17 additions & 17 deletions src/Polly.Core/Hedging/OnHedgingArguments.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,43 +7,43 @@ namespace Polly.Hedging;
/// </summary>
/// <typeparam name="TResult">The type of result.</typeparam>
/// <remarks>
/// The <see cref="PrimaryContext"/> represents the context that was received by the hedging strategy and used to execute the primary action.
/// To prevent race conditions, the hedging strategy then clones the primary context into <see cref="ActionContext"/> and uses it to execute the hedged action.
/// Every hedged action gets its own context that is cloned from the primary.
/// <para>
/// Always use the constructor when creating this struct, otherwise we do not guarantee binary compatibility.
/// </para>
/// </remarks>
public readonly struct OnHedgingArguments<TResult>
{
/// <summary>
/// Initializes a new instance of the <see cref="OnHedgingArguments{TResult}"/> struct.
/// </summary>
/// <param name="outcome">The context in which the resilience operation or event occurred.</param>
/// <param name="context">The outcome of the resilience operation or event.</param>
/// <param name="primaryContext">The primary context received by the hedging strategy.</param>
/// <param name="actionContext">The action context. cloned from the primary context.</param>
/// <param name="attemptNumber">The zero-based hedging attempt number.</param>
/// <param name="duration">The execution duration of hedging attempt or the hedging delay in case the attempt was not finished in time.</param>
public OnHedgingArguments(ResilienceContext context, Outcome<TResult>? outcome, int attemptNumber, TimeSpan duration)
public OnHedgingArguments(ResilienceContext primaryContext, ResilienceContext actionContext, int attemptNumber)
{
Context = context;
Outcome = outcome;
PrimaryContext = primaryContext;
ActionContext = actionContext;
AttemptNumber = attemptNumber;
Duration = duration;
}

/// <summary>
/// Gets the outcome that needs to be hedged, if any.
/// Gets the primary resilience context as received by the hedging strategy.
/// </summary>
/// <remarks>If this property is <see langword="null"/>, it's an indication that user-callback or hedged operation did not complete within the hedging delay.</remarks>
public Outcome<TResult>? Outcome { get; }
public ResilienceContext PrimaryContext { get; }

/// <summary>
/// Gets the context of this event.
/// Gets the action context that will be used for the hedged action.
/// </summary>
public ResilienceContext Context { get; }
/// <remarks>
/// This context is cloned from <see cref="PrimaryContext"/>.
/// </remarks>
public ResilienceContext ActionContext { get; }

/// <summary>
/// Gets the zero-based hedging attempt number.
/// </summary>
public int AttemptNumber { get; }

/// <summary>
/// Gets the execution duration of hedging attempt or the hedging delay in case the attempt was not finished in time.
/// </summary>
public TimeSpan Duration { get; }
}
7 changes: 3 additions & 4 deletions src/Polly.Core/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,11 @@ Polly.Hedging.HedgingStrategyOptions<TResult>.OnHedging.set -> void
Polly.Hedging.HedgingStrategyOptions<TResult>.ShouldHandle.get -> System.Func<Polly.Hedging.HedgingPredicateArguments<TResult>, System.Threading.Tasks.ValueTask<bool>>!
Polly.Hedging.HedgingStrategyOptions<TResult>.ShouldHandle.set -> void
Polly.Hedging.OnHedgingArguments<TResult>
Polly.Hedging.OnHedgingArguments<TResult>.ActionContext.get -> Polly.ResilienceContext!
Polly.Hedging.OnHedgingArguments<TResult>.AttemptNumber.get -> int
Polly.Hedging.OnHedgingArguments<TResult>.Context.get -> Polly.ResilienceContext!
Polly.Hedging.OnHedgingArguments<TResult>.Duration.get -> System.TimeSpan
Polly.Hedging.OnHedgingArguments<TResult>.OnHedgingArguments() -> void
Polly.Hedging.OnHedgingArguments<TResult>.OnHedgingArguments(Polly.ResilienceContext! context, Polly.Outcome<TResult>? outcome, int attemptNumber, System.TimeSpan duration) -> void
Polly.Hedging.OnHedgingArguments<TResult>.Outcome.get -> Polly.Outcome<TResult>?
Polly.Hedging.OnHedgingArguments<TResult>.OnHedgingArguments(Polly.ResilienceContext! primaryContext, Polly.ResilienceContext! actionContext, int attemptNumber) -> void
Polly.Hedging.OnHedgingArguments<TResult>.PrimaryContext.get -> Polly.ResilienceContext!
Polly.HedgingResiliencePipelineBuilderExtensions
Polly.LegacySupport
Polly.Outcome
Expand Down
5 changes: 4 additions & 1 deletion test/Polly.Core.Tests/Hedging/HedgingHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ public async Task GenerateAction_Generic_Ok()
{
var handler = new HedgingHandler<string>(
args => PredicateResult.True(),
args => () => Outcome.FromResultAsValueTask("ok"));
args => () => Outcome.FromResultAsValueTask("ok"),
args => default);

handler.OnHedging.Should().NotBeNull();

var action = handler.GenerateAction(new HedgingActionGeneratorArguments<string>(
ResilienceContextPool.Shared.Get(),
Expand Down
5 changes: 3 additions & 2 deletions test/Polly.Core.Tests/Hedging/HedgingHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ internal static class HedgingHelper
{
public static HedgingHandler<T> CreateHandler<T>(
Func<Outcome<T>, bool> shouldHandle,
Func<HedgingActionGeneratorArguments<T>, Func<ValueTask<Outcome<T>>>?> generator)
Func<HedgingActionGeneratorArguments<T>, Func<ValueTask<Outcome<T>>>?> generator,
Func<OnHedgingArguments<T>, ValueTask>? onHedging = null)
{
return new HedgingHandler<T>(args => new ValueTask<bool>(shouldHandle(args.Outcome!))!, generator);
return new HedgingHandler<T>(args => new ValueTask<bool>(shouldHandle(args.Outcome!))!, generator, onHedging);
}
}

Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.ComponentModel.DataAnnotations;
using System.Globalization;
using Polly.Hedging;
using Polly.Testing;

Expand Down Expand Up @@ -35,8 +34,7 @@ public void AddHedgingT_InvalidOptions_Throws()
[Fact]
public async Task AddHedging_IntegrationTest()
{
var hedgingWithoutOutcome = false;
ConcurrentQueue<string> results = new();
int hedgingCount = 0;

var strategy = _builder
.AddHedging(new()
Expand Down Expand Up @@ -64,15 +62,7 @@ public async Task AddHedging_IntegrationTest()
},
OnHedging = args =>
{
if (args.Outcome is { } outcome)
{
results.Enqueue(outcome.Result!.ToString(CultureInfo.InvariantCulture)!);
}
else
{
hedgingWithoutOutcome = true;
}
hedgingCount++;
return default;
}
})
Expand All @@ -85,8 +75,6 @@ public async Task AddHedging_IntegrationTest()
});

result.Should().Be("success");
results.Should().HaveCountGreaterThan(0);
results.Distinct().Should().ContainSingle("error");
hedgingWithoutOutcome.Should().BeTrue();
hedgingCount.Should().Be(4);
}
}
Loading

0 comments on commit 4759183

Please sign in to comment.