From e40b78e3637dd73652d9b2ca69b6a5e2034bf63d Mon Sep 17 00:00:00 2001 From: Andrew Olson Date: Mon, 7 Aug 2023 15:56:23 -0500 Subject: [PATCH] Wait for topic creation to complete in tests --- .../ProcessingKafkaConsumerITest.java | 9 ++++---- .../ProcessingKafkaConsumerRebalanceIT.java | 5 ++-- .../kafka/producer/KafkaProducerPoolTest.java | 4 ++-- .../producer/KafkaProducerWrapperTest.java | 23 +++++++++++-------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerITest.java b/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerITest.java index 041d75f..e9d2eea 100644 --- a/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerITest.java +++ b/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerITest.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.core.Is.is; @@ -102,18 +103,18 @@ public void setTestName(TestInfo testInfo){ } @Test - public void processingWithProcessingFailuresAndConsumerShutdown() throws IOException, InterruptedException { + public void processingWithProcessingFailuresAndConsumerShutdown() throws InterruptedException, ExecutionException { runProcessing(true); // shutdown consumers } @Test - public void processingWithProcessingFailures() throws IOException, InterruptedException { + public void processingWithProcessingFailures() throws InterruptedException, ExecutionException { runProcessing(false); // don't shutdown consumers } // If you need to debug this add 'log4j.logger.com.cerner.common.kafka=DEBUG' to log4j.properties in src/test/resources // Save output to a file as there will be a lot - public void runProcessing(boolean shutdownConsumers) throws IOException, InterruptedException { + public void runProcessing(boolean shutdownConsumers) throws InterruptedException, ExecutionException { AtomicBoolean finishedProcessing = new AtomicBoolean(false); Map> recordHistory = new ConcurrentHashMap<>(); @@ -126,7 +127,7 @@ public void runProcessing(boolean shutdownConsumers) throws IOException, Interru topicNames.add(topicName); topicList.add(new NewTopic(topicName, PARTITIONS, (short) 1)); } - kafkaAdminClient.createTopics(topicList); + kafkaAdminClient.createTopics(topicList).all().get(); // Setup consumer threads Properties consumerProperties = new Properties(); diff --git a/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerRebalanceIT.java b/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerRebalanceIT.java index dff8c86..f238fab 100644 --- a/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerRebalanceIT.java +++ b/common-kafka/src/test/java/com/cerner/common/kafka/consumer/ProcessingKafkaConsumerRebalanceIT.java @@ -38,6 +38,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -101,7 +102,7 @@ public static void shutdown() throws Exception { @Test @Timeout(300) - public void deadlockFreeProcessingAfterMissedGeneration() throws IOException, InterruptedException { + public void deadlockFreeProcessingAfterMissedGeneration() throws InterruptedException, ExecutionException { Map> recordHistory = new ConcurrentHashMap<>(); @@ -110,7 +111,7 @@ public void deadlockFreeProcessingAfterMissedGeneration() throws IOException, In topicSet.add(new NewTopic(topicName, 1, (short) 1)); // Only 1 replica since our testing only has 1 broker - kafkaAdminClient.createTopics(topicSet); + kafkaAdminClient.createTopics(topicSet).all().get(); // Setup consumer threads Properties consumerProperties = new Properties(); diff --git a/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerPoolTest.java b/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerPoolTest.java index 61315a1..be847e8 100644 --- a/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerPoolTest.java +++ b/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerPoolTest.java @@ -247,13 +247,13 @@ public void producerRotationOverflow() throws IOException { mockPool.close(); } - private void messageProduction(Properties config) throws InterruptedException, KafkaExecutionException, ExecutionException { + private void messageProduction(Properties config) throws InterruptedException, ExecutionException { String topicName = "topic_" + UUID.randomUUID().toString(); NewTopic topic = new NewTopic(topicName, 4, (short) 1); Set topics = new HashSet<>(); topics.add(topic); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Producer producer = pool.getProducer(config); diff --git a/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerWrapperTest.java b/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerWrapperTest.java index bac0f60..0ef15d3 100644 --- a/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerWrapperTest.java +++ b/common-kafka/src/test/java/com/cerner/common/kafka/producer/KafkaProducerWrapperTest.java @@ -94,7 +94,8 @@ public void setup(TestInfo testInfo){ } @Test - public void test_messageSentSynchronouslySuccessfully() throws IOException { + public void test_messageSentSynchronouslySuccessfully() throws IOException, InterruptedException, + ExecutionException { long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count(); long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count(); long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count(); @@ -103,7 +104,7 @@ public void test_messageSentSynchronouslySuccessfully() throws IOException { Set topics = new HashSet<>(); topics.add(new NewTopic(topicName, 4, (short) 1)); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Properties props = KafkaTests.getProps(); props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -125,7 +126,8 @@ public void test_messageSentSynchronouslySuccessfully() throws IOException { } @Test - public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOException { + public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOException, InterruptedException, + ExecutionException { long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count(); long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count(); long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count(); @@ -134,7 +136,7 @@ public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOExcept Set topics = new HashSet<>(); topics.add(new NewTopic(topicName, 4, (short) 1)); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Properties props = KafkaTests.getProps(); props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -198,7 +200,7 @@ public void test_flushFutureExecutionException() throws IOException, ExecutionEx } @Test - public void test_messageSentSuccessfully() throws IOException { + public void test_messageSentSuccessfully() throws IOException, InterruptedException, ExecutionException { long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count(); long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count(); long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count(); @@ -207,7 +209,7 @@ public void test_messageSentSuccessfully() throws IOException { Set topics = new HashSet<>(); topics.add(new NewTopic(topicName, 4, (short) 1)); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Properties props = KafkaTests.getProps(); props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -272,10 +274,10 @@ public void test_WrapperNotCloseProducer() throws IOException, ExecutionExceptio } @Test - public void testSynchronous_messageTooLarge() throws IOException { + public void testSynchronous_messageTooLarge() throws IOException, InterruptedException, ExecutionException { Set topics = new HashSet<>(); topics.add(new NewTopic(topicName, 4, (short) 1)); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Properties props = KafkaTests.getProps(); props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); @@ -312,11 +314,12 @@ public void testSynchronous_messageTooLarge() throws IOException { } @Test - public void test_messageSentSuccessfullyEvenWithFailure() throws IOException { + public void test_messageSentSuccessfullyEvenWithFailure() throws IOException, InterruptedException, + ExecutionException { Set topics = new HashSet<>(); topics.add(new NewTopic(topicName, 4, (short) 1)); - kafkaAdminClient.createTopics(topics); + kafkaAdminClient.createTopics(topics).all().get(); Properties props = getProps(); props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());