diff --git a/docker-compose-confluent-cloud.yml b/docker-compose-confluent-cloud.yml index 6b7086f..aae2331 100644 --- a/docker-compose-confluent-cloud.yml +++ b/docker-compose-confluent-cloud.yml @@ -22,4 +22,7 @@ services: KAFKA_TYPE: ${KAFKA_TYPE} CONFLUENT_KEY: ${CONFLUENT_KEY} CONFLUENT_SECRET: ${CONFLUENT_SECRET} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7fa1e7e..e9ca4ca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,9 @@ services: AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY} HEADER_ACCEPT: ${HEADER_ACCEPT} HEADER_X_API_KEY: ${HEADER_X_API_KEY} + KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT} + KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS} + KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS} depends_on: - kafka diff --git a/run.sh b/run.sh index a3baff4..4054614 100644 --- a/run.sh +++ b/run.sh @@ -22,6 +22,9 @@ export API_ENDPOINT=$API_ENDPOINT export KAFKA_TYPE=$KAFKA_TYPE export CONFLUENT_KEY=$CONFLUENT_KEY export CONFLUENT_SECRET=$CONFLUENT_SECRET +export KAFKA_ENABLE_AUTO_COMMIT=$KAFKA_ENABLE_AUTO_COMMIT +export KAFKA_AUTO_COMMIT_INTERVAL_MS=$KAFKA_AUTO_COMMIT_INTERVAL_MS +export KAFKA_SESSION_TIMEOUT_MS=$KAFKA_SESSION_TIMEOUT_MS # build echo "Compiling." diff --git a/sample.env b/sample.env index b559847..46d4fe0 100644 --- a/sample.env +++ b/sample.env @@ -14,6 +14,12 @@ HEADER_X_API_KEY= KAFKA_TYPE= CONFLUENT_KEY= CONFLUENT_SECRET= +# Defaults to false +KAFKA_ENABLE_AUTO_COMMIT= +# Defaults to 1000 +KAFKA_AUTO_COMMIT_INTERVAL_MS= +# Defaults to 30000 +KAFKA_SESSION_TIMEOUT_MS= # MONGODB Variables MONGO_IP=${DOCKER_HOST_IP} diff --git a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java index 3dd4ba2..279264c 100644 --- a/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java +++ b/src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java @@ -22,6 +22,7 @@ import java.io.Writer; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.time.Duration; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -87,6 +88,10 @@ public class AwsDepositor { private String AWS_SESSION_TOKEN; private String AWS_EXPIRATION; + private String KAFKA_ENABLE_AUTO_COMMIT; + private String KAFKA_AUTO_COMMIT_INTERVAL_MS; + private String KAFKA_SESSION_TIMEOUT_MS; + public static void main(String[] args) throws Exception { AwsDepositor awsDepositor = new AwsDepositor(); awsDepositor.run(args); @@ -149,10 +154,14 @@ public void run(String[] args) throws Exception { addConfluentProperties(props); } + KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000"); + KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false"); + KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000"); + props.put("group.id", group); - props.put("enable.auto.commit", "false"); - props.put("auto.commit.interval.ms", "1000"); - props.put("session.timeout.ms", "30000"); + props.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT); + props.put("auto.commit.interval.ms", KAFKA_AUTO_COMMIT_INTERVAL_MS); + props.put("session.timeout.ms", KAFKA_SESSION_TIMEOUT_MS); boolean depositToS3 = false; @@ -176,7 +185,7 @@ public void run(String[] args) throws Exception { boolean gotMessages = false; while (true) { - ConsumerRecords records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS); + ConsumerRecords records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS)); if (records != null && !records.isEmpty()) { for (ConsumerRecord record : records) { try {