diff --git a/CHANGELOG.md b/CHANGELOG.md index d92c537..f9caa3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## v1.4.0 (December 17, 2024) + +### Added: +- `ReloadOnChange` flag to Kafka configuration. This allows you to change the configuration without restarting the application. + ## v1.3.0 (October 26, 2024) ### Changed: diff --git a/README.md b/README.md index 02f5399..6991a0e 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,7 @@ set: | **Key** | **Type** | **Description** | |----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. | | `Kafka.SaslUsername` | string | The username for the SASL authentication of the Kafka cluster. | | `Kafka.Brokers` | string | The addresses of the Kafka brokers. | | `Kafka.SaslPassword` | string | The password for the SASL authentication of the Kafka cluster. | diff --git a/example/Couchbase/config/config.json b/example/Couchbase/config/config.json index 7a993c7..f8829d5 100644 --- a/example/Couchbase/config/config.json +++ b/example/Couchbase/config/config.json @@ -19,6 +19,7 @@ ] }, "Kafka": { + "reloadOnChange": true, "SaslUsername": "", "Brokers": "", "SslCaLocation": "", diff --git a/example/MsSql/config/config.json b/example/MsSql/config/config.json index 4e9244a..fcf8d7c 100644 --- a/example/MsSql/config/config.json +++ b/example/MsSql/config/config.json @@ -19,6 +19,7 @@ ] }, "Kafka": { + "reloadOnChange": true, "SaslUsername": "", "Brokers": "", "SslCaLocation": "", diff --git a/example/Postgres/config/config.json b/example/Postgres/config/config.json index d0cafcf..5d3f554 100644 --- a/example/Postgres/config/config.json +++ b/example/Postgres/config/config.json @@ -19,6 +19,7 @@ ] }, "Kafka": { + "reloadOnChange": true, "SaslUsername": "", "Brokers": "", "SslCaLocation": "", diff --git a/src/ConfigOptions/Kafka.cs b/src/ConfigOptions/Kafka.cs index 2b3dbef..4c4518d 100644 --- a/src/ConfigOptions/Kafka.cs +++ b/src/ConfigOptions/Kafka.cs @@ -19,4 +19,5 @@ public class Kafka public Acks? Acks { get; set; } public double? LingerMs { get; set; } public string ClientId { get; set; } + public bool ReloadOnChange { get; set; } } \ No newline at end of file diff --git a/src/Coordinators/Services/KafkaProducer.cs b/src/Coordinators/Services/KafkaProducer.cs index 75b1510..8d7c770 100644 --- a/src/Coordinators/Services/KafkaProducer.cs +++ b/src/Coordinators/Services/KafkaProducer.cs @@ -2,6 +2,7 @@ using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Confluent.Kafka; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using NewRelic.Api.Agent; using PollingOutboxPublisher.ConfigOptions; @@ -13,13 +14,26 @@ namespace PollingOutboxPublisher.Coordinators.Services; public sealed class KafkaProducer : IKafkaProducer { private IProducer _producer; + private Kafka _kafkaConfiguration; + private readonly ILogger _logger; - public KafkaProducer(IOptions kafkaOptions) + public KafkaProducer(IOptionsMonitor kafkaOptionsMonitor, ILogger logger) { - var kafkaConfiguration = kafkaOptions.Value; - BuildProducer(kafkaConfiguration); + _logger = logger; + _kafkaConfiguration = kafkaOptionsMonitor.CurrentValue; + kafkaOptionsMonitor.OnChange(OnKafkaConfigChanged); + BuildProducer(_kafkaConfiguration); } + private void OnKafkaConfigChanged(Kafka newKafkaConfig) + { + if (!newKafkaConfig.ReloadOnChange) return; + + _logger.LogInformation("Kafka configuration changed"); + _kafkaConfiguration = newKafkaConfig; + BuildProducer(_kafkaConfiguration); + } + private void BuildProducer(Kafka kafka) { var config = new ProducerConfig @@ -38,6 +52,12 @@ private void BuildProducer(Kafka kafka) MessageMaxBytes = kafka.MessageMaxBytes ?? 30000000, Acks = kafka.Acks ?? Acks.Leader }; + + if (_producer != null) + { + _producer.Flush(TimeSpan.FromSeconds(10)); + _producer.Dispose(); + } _producer = new ProducerBuilder(config).Build(); } diff --git a/src/PollingOutboxPublisher.csproj b/src/PollingOutboxPublisher.csproj index d4bf837..9956c68 100644 --- a/src/PollingOutboxPublisher.csproj +++ b/src/PollingOutboxPublisher.csproj @@ -4,7 +4,7 @@ Exe net8.0 false - 1.3.0 + 1.4.0 PollingOutboxPublisher diff --git a/src/Program.cs b/src/Program.cs index 0e4941f..2c5bb52 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -31,33 +31,20 @@ public static class Program { public static async Task Main(string[] args) { - CheckConfigFileExistence(); var host = CreateHostBuilder(args).Build(); - //ExampleRunner.RunPostgresExample(host); - await host.RunAsync(); } - - private static void CheckConfigFileExistence() - { - var isConfigExist = File.Exists(Directory.GetCurrentDirectory() + "/config/config.json"); - if (!isConfigExist) - throw new FileNotFoundException("File not found", "config.json"); - - var isSecretExist = File.Exists(Directory.GetCurrentDirectory() + "/config/secret.json"); - if (!isSecretExist) - throw new FileNotFoundException("File not found", "secret.json"); - } - + private static IHostBuilder CreateHostBuilder(string[] args) { return Host.CreateDefaultBuilder(args) .ConfigureAppConfiguration( (_, builder) => { - builder.AddJsonFile("config/config.json", true, true) - .AddJsonFile("config/secret.json", true, true) + builder + .AddJsonFile("config/config.json", false, true) + .AddJsonFile("config/secret.json", false, true) .Build(); }) .ConfigureServices((hostContext, services) =>