Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Wait for topic creation to complete in tests #53

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.core.Is.is;
Expand Down Expand Up @@ -102,18 +103,18 @@ public void setTestName(TestInfo testInfo){
}

@Test
public void processingWithProcessingFailuresAndConsumerShutdown() throws IOException, InterruptedException {
public void processingWithProcessingFailuresAndConsumerShutdown() throws InterruptedException, ExecutionException {
runProcessing(true); // shutdown consumers
}

@Test
public void processingWithProcessingFailures() throws IOException, InterruptedException {
public void processingWithProcessingFailures() throws InterruptedException, ExecutionException {
runProcessing(false); // don't shutdown consumers
}

// If you need to debug this add 'log4j.logger.com.cerner.common.kafka=DEBUG' to log4j.properties in src/test/resources
// Save output to a file as there will be a lot
public void runProcessing(boolean shutdownConsumers) throws IOException, InterruptedException {
public void runProcessing(boolean shutdownConsumers) throws InterruptedException, ExecutionException {
AtomicBoolean finishedProcessing = new AtomicBoolean(false);
Map<RecordId, List<ConsumerAction>> recordHistory = new ConcurrentHashMap<>();

Expand All @@ -126,7 +127,7 @@ public void runProcessing(boolean shutdownConsumers) throws IOException, Interru
topicNames.add(topicName);
topicList.add(new NewTopic(topicName, PARTITIONS, (short) 1));
}
kafkaAdminClient.createTopics(topicList);
kafkaAdminClient.createTopics(topicList).all().get();

// Setup consumer threads
Properties consumerProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -101,7 +102,7 @@ public static void shutdown() throws Exception {

@Test
@Timeout(300)
public void deadlockFreeProcessingAfterMissedGeneration() throws IOException, InterruptedException {
public void deadlockFreeProcessingAfterMissedGeneration() throws InterruptedException, ExecutionException {

Map<RecordId, List<ConsumerAction>> recordHistory = new ConcurrentHashMap<>();

Expand All @@ -110,7 +111,7 @@ public void deadlockFreeProcessingAfterMissedGeneration() throws IOException, In
topicSet.add(new NewTopic(topicName, 1, (short) 1));

// Only 1 replica since our testing only has 1 broker
kafkaAdminClient.createTopics(topicSet);
kafkaAdminClient.createTopics(topicSet).all().get();

// Setup consumer threads
Properties consumerProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ public void producerRotationOverflow() throws IOException {
mockPool.close();
}

private void messageProduction(Properties config) throws InterruptedException, KafkaExecutionException, ExecutionException {
private void messageProduction(Properties config) throws InterruptedException, ExecutionException {
String topicName = "topic_" + UUID.randomUUID().toString();
NewTopic topic = new NewTopic(topicName, 4, (short) 1);
Set<NewTopic> topics = new HashSet<>();
topics.add(topic);

kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Producer<String, String> producer = pool.getProducer(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public void setup(TestInfo testInfo){
}

@Test
public void test_messageSentSynchronouslySuccessfully() throws IOException {
public void test_messageSentSynchronouslySuccessfully() throws IOException, InterruptedException,
ExecutionException {
long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count();
long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count();
long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count();
Expand All @@ -103,7 +104,7 @@ public void test_messageSentSynchronouslySuccessfully() throws IOException {

Set<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic(topicName, 4, (short) 1));
kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Properties props = KafkaTests.getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand All @@ -125,7 +126,8 @@ public void test_messageSentSynchronouslySuccessfully() throws IOException {
}

@Test
public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOException {
public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOException, InterruptedException,
ExecutionException {
long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count();
long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count();
long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count();
Expand All @@ -134,7 +136,7 @@ public void test_multipleMessagesSentSynchronouslySuccessfully() throws IOExcept

Set<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic(topicName, 4, (short) 1));
kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Properties props = KafkaTests.getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down Expand Up @@ -198,7 +200,7 @@ public void test_flushFutureExecutionException() throws IOException, ExecutionEx
}

@Test
public void test_messageSentSuccessfully() throws IOException {
public void test_messageSentSuccessfully() throws IOException, InterruptedException, ExecutionException {
long previousSendCount = KafkaProducerWrapper.SEND_TIMER.count();
long previousSyncSendCount = KafkaProducerWrapper.SYNC_SEND_TIMER.count();
long previousFlushCount = KafkaProducerWrapper.FLUSH_TIMER.count();
Expand All @@ -207,7 +209,7 @@ public void test_messageSentSuccessfully() throws IOException {

Set<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic(topicName, 4, (short) 1));
kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Properties props = KafkaTests.getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down Expand Up @@ -272,10 +274,10 @@ public void test_WrapperNotCloseProducer() throws IOException, ExecutionExceptio
}

@Test
public void testSynchronous_messageTooLarge() throws IOException {
public void testSynchronous_messageTooLarge() throws IOException, InterruptedException, ExecutionException {
Set<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic(topicName, 4, (short) 1));
kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Properties props = KafkaTests.getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down Expand Up @@ -312,11 +314,12 @@ public void testSynchronous_messageTooLarge() throws IOException {
}

@Test
public void test_messageSentSuccessfullyEvenWithFailure() throws IOException {
public void test_messageSentSuccessfullyEvenWithFailure() throws IOException, InterruptedException,
ExecutionException {

Set<NewTopic> topics = new HashSet<>();
topics.add(new NewTopic(topicName, 4, (short) 1));
kafkaAdminClient.createTopics(topics);
kafkaAdminClient.createTopics(topics).all().get();

Properties props = getProps();
props.setProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Expand Down