diff --git a/dbsetup.sh b/dbsetup.sh index 23c45aca..8d726b60 100755 --- a/dbsetup.sh +++ b/dbsetup.sh @@ -1,6 +1,10 @@ #!/bin/bash export PGPASSWORD=Password +# alter max connections +psql -h localhost -p 5432 -U platform_notifications_admin -d notificationsdb \ +-c "ALTER SYSTEM SET max_connections TO '200';" + # set up platform_notifications role psql -h localhost -p 5432 -U platform_notifications_admin -d notificationsdb \ -c "DO \$\$ diff --git a/src/Altinn.Notifications.Core/Enums/OrderProcessingStatus.cs b/src/Altinn.Notifications.Core/Enums/OrderProcessingStatus.cs index 757bb44f..645cecab 100644 --- a/src/Altinn.Notifications.Core/Enums/OrderProcessingStatus.cs +++ b/src/Altinn.Notifications.Core/Enums/OrderProcessingStatus.cs @@ -8,6 +8,7 @@ public enum OrderProcessingStatus { Registered, Processing, - Completed + Completed, + SendConditionNotMet } #pragma warning restore CS1591 // Missing XML comment for publicly visible type or member diff --git a/src/Altinn.Notifications.Core/Exceptions/OrderProcessingException.cs b/src/Altinn.Notifications.Core/Exceptions/OrderProcessingException.cs new file mode 100644 index 00000000..90a1b3c6 --- /dev/null +++ b/src/Altinn.Notifications.Core/Exceptions/OrderProcessingException.cs @@ -0,0 +1,33 @@ +namespace Altinn.Notifications.Core.Exceptions; + +/// +/// Represents errors that occur during order processing operations. +/// +public class OrderProcessingException : Exception +{ + /// + /// Initializes a new instance of the class. + /// + public OrderProcessingException() : base() + { + } + + /// + /// Initializes a new instance of the class + /// with a specified error message. + /// + /// The message that describes the error. + public OrderProcessingException(string message) : base(message) + { + } + + /// + /// Initializes a new instance of the class + /// with a specified error message and a reference to the inner exception that is the cause of this exception. + /// + /// The error message that explains the reason for the exception. + /// The exception that is the cause of the current exception, or a null reference if no inner exception is specified. + public OrderProcessingException(string message, Exception inner) : base(message, inner) + { + } +} diff --git a/src/Altinn.Notifications.Core/Services/GetOrderService.cs b/src/Altinn.Notifications.Core/Services/GetOrderService.cs index 87bea858..e6d4c166 100644 --- a/src/Altinn.Notifications.Core/Services/GetOrderService.cs +++ b/src/Altinn.Notifications.Core/Services/GetOrderService.cs @@ -17,6 +17,7 @@ public class GetOrderService : IGetOrderService { OrderProcessingStatus.Registered, "Order has been registered and is awaiting requested send time before processing." }, { OrderProcessingStatus.Processing, "Order processing is ongoing. Notifications are being generated." }, { OrderProcessingStatus.Completed, "Order processing is completed. All notifications have been generated." }, + { OrderProcessingStatus.SendConditionNotMet, "Order processing was stopped due to send condition not being met."} }; /// diff --git a/src/Altinn.Notifications.Core/Services/OrderProcessingService.cs b/src/Altinn.Notifications.Core/Services/OrderProcessingService.cs index 68888568..fc283011 100644 --- a/src/Altinn.Notifications.Core/Services/OrderProcessingService.cs +++ b/src/Altinn.Notifications.Core/Services/OrderProcessingService.cs @@ -2,11 +2,13 @@ using Altinn.Notifications.Core.Configuration; using Altinn.Notifications.Core.Enums; +using Altinn.Notifications.Core.Exceptions; using Altinn.Notifications.Core.Integrations; using Altinn.Notifications.Core.Models.Orders; using Altinn.Notifications.Core.Persistence; using Altinn.Notifications.Core.Services.Interfaces; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace Altinn.Notifications.Core.Services; @@ -19,8 +21,10 @@ public class OrderProcessingService : IOrderProcessingService private readonly IOrderRepository _orderRepository; private readonly IEmailOrderProcessingService _emailProcessingService; private readonly ISmsOrderProcessingService _smsProcessingService; + private readonly IConditionClient _conditionClient; private readonly IKafkaProducer _producer; private readonly string _pastDueOrdersTopic; + private readonly ILogger _logger; /// /// Initializes a new instance of the class. @@ -29,14 +33,18 @@ public OrderProcessingService( IOrderRepository orderRepository, IEmailOrderProcessingService emailProcessingService, ISmsOrderProcessingService smsProcessingService, + IConditionClient conditionClient, IKafkaProducer producer, - IOptions kafkaSettings) + IOptions kafkaSettings, + ILogger logger) { _orderRepository = orderRepository; _emailProcessingService = emailProcessingService; _smsProcessingService = smsProcessingService; + _conditionClient = conditionClient; _producer = producer; _pastDueOrdersTopic = kafkaSettings.Value.PastDueOrdersTopicName; + _logger = logger; } /// @@ -65,6 +73,12 @@ public async Task StartProcessingPastDueOrders() /// public async Task ProcessOrder(NotificationOrder order) { + if (!await IsSendConditionMet(order, isRetry: false)) + { + await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.SendConditionNotMet); + return; + } + NotificationChannel ch = order.NotificationChannel; switch (ch) @@ -83,6 +97,12 @@ public async Task ProcessOrder(NotificationOrder order) /// public async Task ProcessOrderRetry(NotificationOrder order) { + if (!await IsSendConditionMet(order, isRetry: true)) + { + await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.SendConditionNotMet); + return; + } + NotificationChannel ch = order.NotificationChannel; switch (ch) @@ -97,4 +117,33 @@ public async Task ProcessOrderRetry(NotificationOrder order) await _orderRepository.SetProcessingStatus(order.Id, OrderProcessingStatus.Completed); } + + private async Task IsSendConditionMet(NotificationOrder order, bool isRetry) + { + if (order.ConditionEndpoint == null) + { + return true; + } + + var conditionCheckResult = await _conditionClient.CheckSendCondition(order.ConditionEndpoint); + + return conditionCheckResult.Match( + successResult => + { + return successResult; + }, + errorResult => + { + if (!isRetry) + { + // always send to retry on first error. + throw new OrderProcessingException($"// OrderProcessingService // IsSendConditionMet // Condition check for order with ID '{order.Id}' failed with HTTP status code '{errorResult.StatusCode}' at endpoint '{order.ConditionEndpoint}'"); + } + + // notifications should always be created and sent if the condition check is not successful + // log error. + _logger.LogInformation("// OrderProcessingService // IsSendConditionMet // Condition check for order with ID '{order.Id}' failed on retry. Processing regardless.", order.Id); + return true; + }); + } } diff --git a/src/Altinn.Notifications.Integrations/Kafka/Consumers/PastDueOrdersRetryConsumer.cs b/src/Altinn.Notifications.Integrations/Kafka/Consumers/PastDueOrdersRetryConsumer.cs index 09ec0212..2ab9f5cb 100644 --- a/src/Altinn.Notifications.Integrations/Kafka/Consumers/PastDueOrdersRetryConsumer.cs +++ b/src/Altinn.Notifications.Integrations/Kafka/Consumers/PastDueOrdersRetryConsumer.cs @@ -13,17 +13,22 @@ namespace Altinn.Notifications.Integrations.Kafka.Consumers; public class PastDueOrdersRetryConsumer : KafkaConsumerBase { private readonly IOrderProcessingService _orderProcessingService; + private readonly IDateTimeService _dateTime; + + private readonly int _processingDelay = 60000; /// /// Initializes a new instance of the class. /// public PastDueOrdersRetryConsumer( IOrderProcessingService orderProcessingService, + IDateTimeService dateTimeService, IOptions settings, ILogger logger) : base(settings, logger, settings.Value.PastDueOrdersRetryTopicName) { _orderProcessingService = orderProcessingService; + _dateTime = dateTimeService; } /// @@ -41,6 +46,14 @@ private async Task ProcessOrder(string message) return; } + // adding a delay relative to send time to allow transient faults to be resolved + int diff = (int)(_dateTime.UtcNow() - order.RequestedSendTime).TotalMilliseconds; + + if (diff < _processingDelay) + { + await Task.Delay(_processingDelay - diff); + } + await _orderProcessingService.ProcessOrderRetry(order!); } diff --git a/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/GetOrderServiceTests.cs b/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/GetOrderServiceTests.cs index 703b6172..30b5ed83 100644 --- a/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/GetOrderServiceTests.cs +++ b/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/GetOrderServiceTests.cs @@ -135,6 +135,7 @@ await result.Match( [InlineData(OrderProcessingStatus.Registered, "Order has been registered and is awaiting requested send time before processing.")] [InlineData(OrderProcessingStatus.Processing, "Order processing is ongoing. Notifications are being generated.")] [InlineData(OrderProcessingStatus.Completed, "Order processing is completed. All notifications have been generated.")] + [InlineData(OrderProcessingStatus.SendConditionNotMet, "Order processing was stopped due to send condition not being met.")] public void GetStatusDescription_ExpectedDescription(OrderProcessingStatus status, string expected) { string actual = GetOrderService.GetStatusDescription(status); diff --git a/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/OrderProcessingServiceTests.cs b/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/OrderProcessingServiceTests.cs index 7c252c3f..dd4c820b 100644 --- a/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/OrderProcessingServiceTests.cs +++ b/test/Altinn.Notifications.Tests/Notifications.Core/TestingServices/OrderProcessingServiceTests.cs @@ -9,6 +9,9 @@ using Altinn.Notifications.Core.Services; using Altinn.Notifications.Core.Services.Interfaces; +using Castle.Core.Logging; + +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; @@ -179,7 +182,8 @@ private static OrderProcessingService GetTestService( IOrderRepository? repo = null, IEmailOrderProcessingService? emailMock = null, ISmsOrderProcessingService? smsMock = null, - IKafkaProducer? producer = null) + IKafkaProducer? producer = null, + IConditionClient? conditionClient = null) { if (repo == null) { @@ -205,8 +209,14 @@ private static OrderProcessingService GetTestService( producer = producerMock.Object; } + if (conditionClient == null) + { + var conditionClientMock = new Mock(); + conditionClient = conditionClientMock.Object; + } + var kafkaSettings = new Altinn.Notifications.Core.Configuration.KafkaSettings() { PastDueOrdersTopicName = _pastDueTopicName }; - return new OrderProcessingService(repo, emailMock, smsMock, producer, Options.Create(kafkaSettings)); + return new OrderProcessingService(repo, emailMock, smsMock, conditionClient, producer, Options.Create(kafkaSettings), new LoggerFactory().CreateLogger()); } }