From 916bc9c44c7702873a285f3d8165f9deea26fcb1 Mon Sep 17 00:00:00 2001 From: Stephanie Buadu <47737608+acn-sbuad@users.noreply.github.com> Date: Mon, 27 Nov 2023 16:13:51 +0100 Subject: [PATCH] consumer group per consumer type #GCPActive (#317) --- .../Configuration/KafkaSettings.cs | 5 +++ .../Kafka/Consumers/KafkaConsumerBase.cs | 2 +- .../Kafka/Producers/KafkaProducer.cs | 3 +- .../Kafka/SharedClientConfig.cs | 6 +++ .../ServiceCollectionExtensionsTests.cs | 38 ------------------- 5 files changed, 14 insertions(+), 40 deletions(-) delete mode 100644 test/Altinn.Notifications.Tests/Notifications.Integrations/TestingExtensions/ServiceCollectionExtensionsTests.cs diff --git a/src/Altinn.Notifications.Integrations/Configuration/KafkaSettings.cs b/src/Altinn.Notifications.Integrations/Configuration/KafkaSettings.cs index b03359a2..e1251a32 100644 --- a/src/Altinn.Notifications.Integrations/Configuration/KafkaSettings.cs +++ b/src/Altinn.Notifications.Integrations/Configuration/KafkaSettings.cs @@ -107,4 +107,9 @@ public class AdminSettings /// The list of topics the admin client is responsible for ensuring that exist /// public List TopicList { get; set; } = new List(); + + /// + /// The retention time in days for the topics + /// + public int RetentionTime { get; set; } = 7; } diff --git a/src/Altinn.Notifications.Integrations/Kafka/Consumers/KafkaConsumerBase.cs b/src/Altinn.Notifications.Integrations/Kafka/Consumers/KafkaConsumerBase.cs index 1ae630cc..3fb2bd85 100644 --- a/src/Altinn.Notifications.Integrations/Kafka/Consumers/KafkaConsumerBase.cs +++ b/src/Altinn.Notifications.Integrations/Kafka/Consumers/KafkaConsumerBase.cs @@ -32,7 +32,7 @@ protected KafkaConsumerBase( var consumerConfig = new ConsumerConfig(config.ConsumerSettings) { - GroupId = settings.Value.Consumer.GroupId, + GroupId = $"{settings.Value.Consumer.GroupId}-{GetType().Name.ToLower()}", EnableAutoCommit = false, EnableAutoOffsetStore = false, AutoOffsetReset = AutoOffsetReset.Earliest, diff --git a/src/Altinn.Notifications.Integrations/Kafka/Producers/KafkaProducer.cs b/src/Altinn.Notifications.Integrations/Kafka/Producers/KafkaProducer.cs index 2aba9d19..7f7f3da5 100644 --- a/src/Altinn.Notifications.Integrations/Kafka/Producers/KafkaProducer.cs +++ b/src/Altinn.Notifications.Integrations/Kafka/Producers/KafkaProducer.cs @@ -99,7 +99,8 @@ private void EnsureTopicsExist() { Name = topic, NumPartitions = TopicSpecification.NumPartitions, - ReplicationFactor = TopicSpecification.ReplicationFactor + ReplicationFactor = TopicSpecification.ReplicationFactor, + Configs = TopicSpecification.Configs } }).Wait(); _logger.LogInformation("// KafkaProducer // EnsureTopicsExists // Topic '{Topic}' created successfully.", topic); diff --git a/src/Altinn.Notifications.Integrations/Kafka/SharedClientConfig.cs b/src/Altinn.Notifications.Integrations/Kafka/SharedClientConfig.cs index 7c851d95..3c4ec59c 100644 --- a/src/Altinn.Notifications.Integrations/Kafka/SharedClientConfig.cs +++ b/src/Altinn.Notifications.Integrations/Kafka/SharedClientConfig.cs @@ -74,8 +74,14 @@ public SharedClientConfig(Configuration.KafkaSettings settings) consumerConfig.SaslUsername = settings.Consumer.SaslUsername; consumerConfig.SaslPassword = settings.Consumer.SaslPassword; + string retentionTime = settings.Admin.RetentionTime < 0 ? "-1" : TimeSpan.FromDays(settings.Admin.RetentionTime).TotalMilliseconds.ToString(); topicSpec.NumPartitions = 6; topicSpec.ReplicationFactor = 3; + topicSpec.Configs = new Dictionary() + { + { "retention.ms", retentionTime }, + { "cleanup.policy", "delete" } + }; } AdminClientSettings = adminConfig; diff --git a/test/Altinn.Notifications.Tests/Notifications.Integrations/TestingExtensions/ServiceCollectionExtensionsTests.cs b/test/Altinn.Notifications.Tests/Notifications.Integrations/TestingExtensions/ServiceCollectionExtensionsTests.cs deleted file mode 100644 index 88d931b4..00000000 --- a/test/Altinn.Notifications.Tests/Notifications.Integrations/TestingExtensions/ServiceCollectionExtensionsTests.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System; - -using Altinn.Notifications.Integrations.Extensions; - -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; - -using Xunit; - -namespace Altinn.Notifications.Tests.Notifications.Integrations.TestingExtensions; - -public class ServiceCollectionExtensionsTests -{ - [Fact] - public void AddKafkaServices_KafkaSettingsMissing_ThrowsException() - { - Environment.SetEnvironmentVariable("KafkaSettings", null); - - var config = new ConfigurationBuilder().AddEnvironmentVariables().Build(); - - IServiceCollection services = new ServiceCollection(); - - Assert.Throws(() => services.AddKafkaServices(config)); - } - - [Fact] - public void AddKafkaHealthChecks_KafkaSettingsMissing_ThrowsException() - { - Environment.SetEnvironmentVariable("KafkaSettings", null); - - var config = new ConfigurationBuilder().AddEnvironmentVariables().Build(); - - IServiceCollection services = new ServiceCollection() - .AddLogging(); - - Assert.Throws(() => services.AddKafkaHealthChecks(config)); - } -}