diff --git a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs index 9262efe..a485831 100644 --- a/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs +++ b/src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs @@ -404,27 +404,18 @@ async Task ProcessMessageWithInMemoryRetries(Dictionary headers, catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) { immediateProcessingAttempts++; - ErrorHandleResult errorHandlerResult; - try - { - var errorContext = new ErrorContext( - ex, - new Dictionary(headers), - nativeMessageId, - body, - transportTransaction, - immediateProcessingAttempts, - receiveQueueAddress, - context); - - errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext, cancellationToken).ConfigureAwait(false); - } - catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(cancellationToken)) - { - Logger.Warn($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", onErrorEx); - throw; - } + var errorContext = new ErrorContext( + ex, + new Dictionary(headers), + nativeMessageId, + body, + transportTransaction, + immediateProcessingAttempts, + receiveQueueAddress, + context); + + var errorHandlerResult = await ProcessFailedMessage(errorContext, lambdaContext, nativeMessageId, cancellationToken).ConfigureAwait(false); errorHandled = errorHandlerResult == ErrorHandleResult.Handled; } @@ -435,17 +426,26 @@ async Task Process(MessageContext messageContext, ILambdaContext executionContex { await InitializeEndpointIfNecessary(executionContext, cancellationToken) .ConfigureAwait(false); + await pipeline.PushMessage(messageContext, cancellationToken) .ConfigureAwait(false); } - async Task ProcessFailedMessage(ErrorContext errorContext, ILambdaContext executionContext, CancellationToken cancellationToken) + async Task ProcessFailedMessage(ErrorContext errorContext, ILambdaContext executionContext, string nativeMessageId, CancellationToken cancellationToken) { - await InitializeEndpointIfNecessary(executionContext, cancellationToken) - .ConfigureAwait(false); + try + { + await InitializeEndpointIfNecessary(executionContext, cancellationToken) + .ConfigureAwait(false); - return await pipeline.PushFailedMessage(errorContext, cancellationToken) - .ConfigureAwait(false); + return await pipeline.PushFailedMessage(errorContext, cancellationToken) + .ConfigureAwait(false); + } + catch (Exception onErrorEx) when (!onErrorEx.IsCausedBy(cancellationToken)) + { + Logger.Warn($"Failed to execute recoverability policy for message with native ID: `{nativeMessageId}`", onErrorEx); + throw; + } } async Task DeleteMessageAndBodyIfRequired(Message message, string messageS3BodyKey, CancellationToken cancellationToken)