From 8ffd14fcbff7c07f472d3cf760ed51eeca357f6d Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Mon, 29 Apr 2024 13:34:23 -0600 Subject: [PATCH 1/3] Update hardcoded kafka properties to be configurable --- docker-compose-confluent-cloud.yml | 3 +++ docker-compose.yml | 3 +++ run.sh | 3 +++ sample.env | 3 +++ .../its/jpo/ode/aws/depositor/AwsDepositor.java | 17 +++++++++++++---- 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6b7086f..aae2331 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -22,4 +22,7 @@ services: KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7fa1e7e..e9ca4ca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,9 @@ services: AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} HEADER_ACCEPT: ${HEADER_ACCEPT} HEADER_X_API_KEY: ${HEADER_X_API_KEY} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} depends_on: - kafka diff --git a/run.sh b/run.sh index a3baff4..4054614 100644 --- a/run.sh +++ b/run.sh @@ -22,6 +22,9 @@ export API_ENDPOINT=$API_ENDPOINT export KAFKA_TYPE=$KAFKA_TYPE export CONFLUENT_KEY=$CONFLUENT_KEY export CONFLUENT_SECRET=$CONFLUENT_SECRET +export KAFKA_ENABLE_AUTO_COMMIT=$KAFKA_ENABLE_AUTO_COMMIT +export KAFKA_AUTO_COMMIT_INTERVAL_MS=$KAFKA_AUTO_COMMIT_INTERVAL_MS +export KAFKA_SESSION_TIMEOUT_MS=$KAFKA_SESSION_TIMEOUT_MS # build echo "Compiling." diff --git a/sample.env b/sample.env index b559847..bbf6ca1 100644 --- a/sample.env +++ b/sample.env @@ -14,6 +14,9 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= +KAFKA_ENABLE_AUTO_COMMIT= +KAFKA_AUTO_COMMIT_INTERVAL_MS= +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3dd4ba2..279264c 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -22,6 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -87,6 +88,10 @@ public class AwsDepositor { private String AWS_SESSION_TOKEN; private String AWS_EXPIRATION; + private String KAFKA_ENABLE_AUTO_COMMIT; + private String KAFKA_AUTO_COMMIT_INTERVAL_MS; + private String KAFKA_SESSION_TIMEOUT_MS; + public static void main(String[] args) throws Exception { AwsDepositor awsDepositor = new AwsDepositor(); awsDepositor.run(args); @@ -149,10 +154,14 @@ public void run(String[] args) throws Exception { addConfluentProperties(props); } + KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000"); + KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false"); + KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000"); + props.put("group.id", group); - props.put("enable.auto.commit", "false"); - props.put("auto.commit.interval.ms", "1000"); - props.put("session.timeout.ms", "30000"); + props.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT); + props.put("auto.commit.interval.ms", KAFKA_AUTO_COMMIT_INTERVAL_MS); + props.put("session.timeout.ms", KAFKA_SESSION_TIMEOUT_MS); boolean depositToS3 = false; @@ -176,7 +185,7 @@ public void run(String[] args) throws Exception { boolean gotMessages = false; while (true) { - ConsumerRecords records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); if (records != null && !records.isEmpty()) { for (ConsumerRecord record : records) { try { From a4bebce5068a8d7e9943056eb09cec81343253f8 Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Tue, 30 Apr 2024 12:59:28 -0600 Subject: [PATCH 2/3] Update sample.env comments --- sample.env | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sample.env b/sample.env index bbf6ca1..f2dd709 100644 --- a/sample.env +++ b/sample.env @@ -14,9 +14,9 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= -KAFKA_ENABLE_AUTO_COMMIT= -KAFKA_AUTO_COMMIT_INTERVAL_MS= -KAFKA_SESSION_TIMEOUT_MS= +KAFKA_ENABLE_AUTO_COMMIT= # Defaults to false +KAFKA_AUTO_COMMIT_INTERVAL_MS= # Defaults to 1000 +KAFKA_SESSION_TIMEOUT_MS= # Defaults to 30000 # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} From c733e463d354e5e95642d15afbbac86dd1eeba6e Mon Sep 17 00:00:00 2001 From: Marc Wodahl Date: Thu, 9 May 2024 07:56:38 -0600 Subject: [PATCH 3/3] Moved sample.env comments to separate lines --- sample.env | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sample.env b/sample.env index f2dd709..46d4fe0 100644 --- a/sample.env +++ b/sample.env @@ -14,9 +14,12 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= -KAFKA_ENABLE_AUTO_COMMIT= # Defaults to false -KAFKA_AUTO_COMMIT_INTERVAL_MS= # Defaults to 1000 -KAFKA_SESSION_TIMEOUT_MS= # Defaults to 30000 +# Defaults to false +KAFKA_ENABLE_AUTO_COMMIT= +# Defaults to 1000 +KAFKA_AUTO_COMMIT_INTERVAL_MS= +# Defaults to 30000 +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP}