Skip to content

Commit

Permalink
Merge pull request #18 from CDOT-CV/Fix/hardcoded-variables
Browse files Browse the repository at this point in the history
Update Hardcoded Kafka Properties to be Configurable
  • Loading branch information
dmccoystephenson authored May 22, 2024
2 parents 5e4009f + c733e46 commit 8bf4e57
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 4 deletions.
3 changes: 3 additions & 0 deletions docker-compose-confluent-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ services:
KAFKA_TYPE: ${KAFKA_TYPE}
CONFLUENT_KEY: ${CONFLUENT_KEY}
CONFLUENT_SECRET: ${CONFLUENT_SECRET}
KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT}
KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS}
KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS}

3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ services:
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
HEADER_ACCEPT: ${HEADER_ACCEPT}
HEADER_X_API_KEY: ${HEADER_X_API_KEY}
KAFKA_ENABLE_AUTO_COMMIT: ${KAFKA_ENABLE_AUTO_COMMIT}
KAFKA_AUTO_COMMIT_INTERVAL_MS: ${KAFKA_AUTO_COMMIT_INTERVAL_MS}
KAFKA_SESSION_TIMEOUT_MS: ${KAFKA_SESSION_TIMEOUT_MS}
depends_on:
- kafka

Expand Down
3 changes: 3 additions & 0 deletions run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ export API_ENDPOINT=$API_ENDPOINT
export KAFKA_TYPE=$KAFKA_TYPE
export CONFLUENT_KEY=$CONFLUENT_KEY
export CONFLUENT_SECRET=$CONFLUENT_SECRET
export KAFKA_ENABLE_AUTO_COMMIT=$KAFKA_ENABLE_AUTO_COMMIT
export KAFKA_AUTO_COMMIT_INTERVAL_MS=$KAFKA_AUTO_COMMIT_INTERVAL_MS
export KAFKA_SESSION_TIMEOUT_MS=$KAFKA_SESSION_TIMEOUT_MS

# build
echo "Compiling."
Expand Down
6 changes: 6 additions & 0 deletions sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ HEADER_X_API_KEY=
KAFKA_TYPE=
CONFLUENT_KEY=
CONFLUENT_SECRET=
# Defaults to false
KAFKA_ENABLE_AUTO_COMMIT=
# Defaults to 1000
KAFKA_AUTO_COMMIT_INTERVAL_MS=
# Defaults to 30000
KAFKA_SESSION_TIMEOUT_MS=

# MONGODB Variables
MONGO_IP=${DOCKER_HOST_IP}
Expand Down
17 changes: 13 additions & 4 deletions src/main/java/us/dot/its/jpo/ode/aws/depositor/AwsDepositor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
Expand Down Expand Up @@ -87,6 +88,10 @@ public class AwsDepositor {
private String AWS_SESSION_TOKEN;
private String AWS_EXPIRATION;

private String KAFKA_ENABLE_AUTO_COMMIT;
private String KAFKA_AUTO_COMMIT_INTERVAL_MS;
private String KAFKA_SESSION_TIMEOUT_MS;

public static void main(String[] args) throws Exception {
AwsDepositor awsDepositor = new AwsDepositor();
awsDepositor.run(args);
Expand Down Expand Up @@ -149,10 +154,14 @@ public void run(String[] args) throws Exception {
addConfluentProperties(props);
}

KAFKA_AUTO_COMMIT_INTERVAL_MS = getEnvironmentVariable("KAFKA_AUTO_COMMIT_INTERVAL_MS", "1000");
KAFKA_ENABLE_AUTO_COMMIT = getEnvironmentVariable("KAFKA_ENABLE_AUTO_COMMIT", "false");
KAFKA_SESSION_TIMEOUT_MS = getEnvironmentVariable("KAFKA_SESSION_TIMEOUT_MS", "30000");

props.put("group.id", group);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("enable.auto.commit", KAFKA_ENABLE_AUTO_COMMIT);
props.put("auto.commit.interval.ms", KAFKA_AUTO_COMMIT_INTERVAL_MS);
props.put("session.timeout.ms", KAFKA_SESSION_TIMEOUT_MS);


boolean depositToS3 = false;
Expand All @@ -176,7 +185,7 @@ public void run(String[] args) throws Exception {
boolean gotMessages = false;

while (true) {
ConsumerRecords<String, String> records = stringConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
ConsumerRecords<String, String> records = stringConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT_MS));
if (records != null && !records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
try {
Expand Down

0 comments on commit 8bf4e57

Please sign in to comment.