Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added condition check to order processing #567

Merged
merged 11 commits into from
Jul 4, 2024
4 changes: 4 additions & 0 deletions dbsetup.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#!/bin/bash
export PGPASSWORD=Password

# alter max connections
psql -h localhost -p 5432 -U platform_notifications_admin -d notificationsdb \
-c "ALTER SYSTEM SET max_connections TO '200';"

# set up platform_notifications role
psql -h localhost -p 5432 -U platform_notifications_admin -d notificationsdb \
-c "DO \$\$
Expand Down
3 changes: 2 additions & 1 deletion src/Altinn.Notifications.Core/Enums/OrderProcessingStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public enum OrderProcessingStatus
{
Registered,
Processing,
Completed
Completed,
SendConditionNotMet
}
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Diagnostics.CodeAnalysis;

namespace Altinn.Notifications.Core.Exceptions;

/// <summary>
/// Represents errors that occur during order processing operations.
/// </summary>
[ExcludeFromCodeCoverage]
public class OrderProcessingException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="OrderProcessingException"/> class.
/// </summary>
public OrderProcessingException() : base()
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OrderProcessingException"/> class
/// with a specified error message.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public OrderProcessingException(string message) : base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="OrderProcessingException"/> class
/// with a specified error message and a reference to the inner exception that is the cause of this exception.
/// </summary>
/// <param name="message">The error message that explains the reason for the exception.</param>
/// <param name="inner">The exception that is the cause of the current exception, or a null reference if no inner exception is specified.</param>
public OrderProcessingException(string message, Exception inner) : base(message, inner)
{
}
}
1 change: 1 addition & 0 deletions src/Altinn.Notifications.Core/Services/GetOrderService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class GetOrderService : IGetOrderService
{ OrderProcessingStatus.Registered, "Order has been registered and is awaiting requested send time before processing." },
{ OrderProcessingStatus.Processing, "Order processing is ongoing. Notifications are being generated." },
{ OrderProcessingStatus.Completed, "Order processing is completed. All notifications have been generated." },
{ OrderProcessingStatus.SendConditionNotMet, "Order processing was stopped due to send condition not being met." }
};

/// <summary>
Expand Down
57 changes: 56 additions & 1 deletion src/Altinn.Notifications.Core/Services/OrderProcessingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

using Altinn.Notifications.Core.Configuration;
using Altinn.Notifications.Core.Enums;
using Altinn.Notifications.Core.Exceptions;
using Altinn.Notifications.Core.Integrations;
using Altinn.Notifications.Core.Models.Orders;
using Altinn.Notifications.Core.Persistence;
using Altinn.Notifications.Core.Services.Interfaces;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Altinn.Notifications.Core.Services;
Expand All @@ -19,8 +21,10 @@ public class OrderProcessingService : IOrderProcessingService
private readonly IOrderRepository _orderRepository;
private readonly IEmailOrderProcessingService _emailProcessingService;
private readonly ISmsOrderProcessingService _smsProcessingService;
private readonly IConditionClient _conditionClient;
private readonly IKafkaProducer _producer;
private readonly string _pastDueOrdersTopic;
private readonly ILogger<OrderProcessingService> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="OrderProcessingService"/> class.
Expand All @@ -29,14 +33,18 @@ public OrderProcessingService(
IOrderRepository orderRepository,
IEmailOrderProcessingService emailProcessingService,
ISmsOrderProcessingService smsProcessingService,
IConditionClient conditionClient,
IKafkaProducer producer,
IOptions<KafkaSettings> kafkaSettings)
IOptions<KafkaSettings> kafkaSettings,
ILogger<OrderProcessingService> logger)
{
_orderRepository = orderRepository;
_emailProcessingService = emailProcessingService;
_smsProcessingService = smsProcessingService;
_conditionClient = conditionClient;
_producer = producer;
_pastDueOrdersTopic = kafkaSettings.Value.PastDueOrdersTopicName;
_logger = logger;
}

/// <inheritdoc/>
Expand Down Expand Up @@ -65,6 +73,12 @@ public async Task StartProcessingPastDueOrders()
/// <inheritdoc/>
public async Task ProcessOrder(NotificationOrder order)
{
if (!await IsSendConditionMet(order, isRetry: false))
{
await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.SendConditionNotMet);
return;
}

NotificationChannel ch = order.NotificationChannel;

switch (ch)
Expand All @@ -83,6 +97,12 @@ public async Task ProcessOrder(NotificationOrder order)
/// <inheritdoc/>
public async Task ProcessOrderRetry(NotificationOrder order)
{
if (!await IsSendConditionMet(order, isRetry: true))
{
await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.SendConditionNotMet);
return;
}

NotificationChannel ch = order.NotificationChannel;

switch (ch)
Expand All @@ -97,4 +117,39 @@ public async Task ProcessOrderRetry(NotificationOrder order)

await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.Completed);
}

/// <summary>
/// Checks the send condition provided by the order request to determine if condition is met
/// </summary>
/// <param name="order">The notification order to check</param>
/// <param name="isRetry">Boolean indicating if this is a retry attempt</param>
/// <returns>True if condition is met and processing should continue</returns>
/// <exception cref="OrderProcessingException">Throws an exception if failure on first attempt ot check condition</exception>
internal async Task<bool> IsSendConditionMet(NotificationOrder order, bool isRetry)
{
if (order.ConditionEndpoint == null)
{
return true;
}

var conditionCheckResult = await _conditionClient.CheckSendCondition(order.ConditionEndpoint);

return conditionCheckResult.Match(
successResult =>
{
return successResult;
},
errorResult =>
{
if (!isRetry)
{
// Always send to retry on first error. Exception is caught by consumer and message is moved to retry topic.
throw new OrderProcessingException($"// OrderProcessingService // IsSendConditionMet // Condition check for order with ID '{order.Id}' failed with HTTP status code '{errorResult.StatusCode}' at endpoint '{order.ConditionEndpoint}'");
}

// notifications should always be created and sent if the condition check is not successful
_logger.LogInformation("// OrderProcessingService // IsSendConditionMet // Condition check for order with ID '{ID}' failed on retry. Processing regardless.", order.Id);
return true;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ namespace Altinn.Notifications.Integrations.Kafka.Consumers;
public class PastDueOrdersRetryConsumer : KafkaConsumerBase<PastDueOrdersRetryConsumer>
{
private readonly IOrderProcessingService _orderProcessingService;
private readonly IDateTimeService _dateTime;

private readonly int _processingDelayMins = 1;

/// <summary>
/// Initializes a new instance of the <see cref="PastDueOrdersRetryConsumer"/> class.
/// </summary>
public PastDueOrdersRetryConsumer(
IOrderProcessingService orderProcessingService,
IDateTimeService dateTimeService,
IOptions<KafkaSettings> settings,
ILogger<PastDueOrdersRetryConsumer> logger)
: base(settings, logger, settings.Value.PastDueOrdersRetryTopicName)
{
_orderProcessingService = orderProcessingService;
_dateTime = dateTimeService;
}

/// <inheritdoc/>
Expand All @@ -41,6 +46,16 @@ private async Task ProcessOrder(string message)
return;
}

// adding a delay relative to send time to allow transient faults to be resolved
TimeSpan diff = _dateTime.UtcNow() - order.RequestedSendTime;

TimeSpan delayForRetryAttempt = TimeSpan.FromMinutes(_processingDelayMins) - diff;

if (delayForRetryAttempt > TimeSpan.Zero)
{
await Task.Delay(delayForRetryAttempt);
}

await _orderProcessingService.ProcessOrderRetry(order!);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE public.orderprocessingstate ADD VALUE IF NOT EXISTS 'SendConditionNotMet';
Loading
Loading