Skip to content

Commit

Permalink
Merge pull request #13 from Trendyol/feat/issue-12-detect-kafka-changes
Browse files Browse the repository at this point in the history
Feat/issue 12 detect kafka changes
  • Loading branch information
OguzErdi authored Dec 17, 2024
2 parents f730103 + 4969825 commit abfe19e
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 21 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## v1.4.0 (December 17, 2024)

### Added:
- `ReloadOnChange` flag to Kafka configuration. This allows you to change the configuration without restarting the application.

## v1.3.0 (October 26, 2024)

### Changed:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ set:
| **Key** | **Type** | **Description** |
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `Kafka.ReloadOnChange` | bool | The flag indicating whether the Kafka configuration should be reloaded when the configuration file changes. |
| `Kafka.SaslUsername` | string | The username for the SASL authentication of the Kafka cluster. |
| `Kafka.Brokers` | string | The addresses of the Kafka brokers. |
| `Kafka.SaslPassword` | string | The password for the SASL authentication of the Kafka cluster. |
Expand Down
1 change: 1 addition & 0 deletions example/Couchbase/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
]
},
"Kafka": {
"reloadOnChange": true,
"SaslUsername": "",
"Brokers": "",
"SslCaLocation": "",
Expand Down
1 change: 1 addition & 0 deletions example/MsSql/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
]
},
"Kafka": {
"reloadOnChange": true,
"SaslUsername": "",
"Brokers": "",
"SslCaLocation": "",
Expand Down
1 change: 1 addition & 0 deletions example/Postgres/config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
]
},
"Kafka": {
"reloadOnChange": true,
"SaslUsername": "",
"Brokers": "",
"SslCaLocation": "",
Expand Down
1 change: 1 addition & 0 deletions src/ConfigOptions/Kafka.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ public class Kafka
public Acks? Acks { get; set; }
public double? LingerMs { get; set; }
public string ClientId { get; set; }
public bool ReloadOnChange { get; set; }
}
26 changes: 23 additions & 3 deletions src/Coordinators/Services/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NewRelic.Api.Agent;
using PollingOutboxPublisher.ConfigOptions;
Expand All @@ -13,13 +14,26 @@ namespace PollingOutboxPublisher.Coordinators.Services;
public sealed class KafkaProducer : IKafkaProducer
{
private IProducer<string, string> _producer;
private Kafka _kafkaConfiguration;
private readonly ILogger<KafkaProducer> _logger;

public KafkaProducer(IOptions<Kafka> kafkaOptions)
public KafkaProducer(IOptionsMonitor<Kafka> kafkaOptionsMonitor, ILogger<KafkaProducer> logger)
{
var kafkaConfiguration = kafkaOptions.Value;
BuildProducer(kafkaConfiguration);
_logger = logger;
_kafkaConfiguration = kafkaOptionsMonitor.CurrentValue;
kafkaOptionsMonitor.OnChange(OnKafkaConfigChanged);
BuildProducer(_kafkaConfiguration);
}

private void OnKafkaConfigChanged(Kafka newKafkaConfig)
{
if (!newKafkaConfig.ReloadOnChange) return;

_logger.LogInformation("Kafka configuration changed");
_kafkaConfiguration = newKafkaConfig;
BuildProducer(_kafkaConfiguration);
}

private void BuildProducer(Kafka kafka)
{
var config = new ProducerConfig
Expand All @@ -38,6 +52,12 @@ private void BuildProducer(Kafka kafka)
MessageMaxBytes = kafka.MessageMaxBytes ?? 30000000,
Acks = kafka.Acks ?? Acks.Leader
};

if (_producer != null)
{
_producer.Flush(TimeSpan.FromSeconds(10));
_producer.Dispose();
}
_producer = new ProducerBuilder<string, string>(config).Build();
}

Expand Down
2 changes: 1 addition & 1 deletion src/PollingOutboxPublisher.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<IsPackable>false</IsPackable>
<Version>1.3.0</Version>
<Version>1.4.0</Version>
<RootNamespace>PollingOutboxPublisher</RootNamespace>
</PropertyGroup>

Expand Down
21 changes: 4 additions & 17 deletions src/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,20 @@ public static class Program
{
public static async Task Main(string[] args)
{
CheckConfigFileExistence();
var host = CreateHostBuilder(args).Build();

//ExampleRunner.RunPostgresExample(host);

await host.RunAsync();
}

private static void CheckConfigFileExistence()
{
var isConfigExist = File.Exists(Directory.GetCurrentDirectory() + "/config/config.json");
if (!isConfigExist)
throw new FileNotFoundException("File not found", "config.json");

var isSecretExist = File.Exists(Directory.GetCurrentDirectory() + "/config/secret.json");
if (!isSecretExist)
throw new FileNotFoundException("File not found", "secret.json");
}


private static IHostBuilder CreateHostBuilder(string[] args)
{
return Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(
(_, builder) =>
{
builder.AddJsonFile("config/config.json", true, true)
.AddJsonFile("config/secret.json", true, true)
builder
.AddJsonFile("config/config.json", false, true)
.AddJsonFile("config/secret.json", false, true)
.Build();
})
.ConfigureServices((hostContext, services) =>
Expand Down

0 comments on commit abfe19e

Please sign in to comment.