From bf69def3bf15600e615a668275c35d1a56959f36 Mon Sep 17 00:00:00 2001 From: Timon Borter Date: Mon, 16 Dec 2024 19:07:38 +0100 Subject: [PATCH] fix(#1281): introduce separate consumers per subscription --- .../kafka/endpoint/KafkaConsumer.java | 66 +++++++++-- .../kafka/endpoint/KafkaEndpoint.java | 30 ++++- .../kafka/endpoint/KafkaConsumerTest.java | 105 ++++++++++++++++-- .../kafka/endpoint/KafkaEndpointTest.java | 7 +- .../integration/KafkaEndpointFactoryIT.java | 1 - 5 files changed, 184 insertions(+), 25 deletions(-) diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java index b84ddd456e..4dec903aaa 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java @@ -22,11 +22,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; import static java.util.UUID.randomUUID; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; @@ -44,28 +46,71 @@ public class KafkaConsumer extends AbstractSelectiveMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private org.apache.kafka.clients.consumer.KafkaConsumer consumer; + private final ConcurrentLinkedQueue> managedConsumers = new ConcurrentLinkedQueue<>(); /** * Default constructor using endpoint. */ public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) { super(name, endpointConfiguration); - this.consumer = createConsumer(); } + /** + * Initializes and provides a new {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance in a thread-safe manner. + * This method is the preferred way to obtain a consumer instance as it ensures proper lifecycle management and thread-safety. + *

+ * The created consumer is automatically registered for lifecycle management and cleanup. + * Each call to this method creates a new consumer instance. + * + * @return a new thread-safe {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance + */ + public org.apache.kafka.clients.consumer.KafkaConsumer createManagedConsumer() { + if (nonNull(consumer)) { + return consumer; + } + + var managedConsumer = createKafkaConsumer(); + managedConsumers.add(managedConsumer); + return managedConsumer; + } + + /** + * Returns the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance. + * If non exists, a managed one is being created (see {@link #createManagedConsumer()}). + * + * @return the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { + if (isNull(consumer)) { + consumer = createKafkaConsumer(); + } + return consumer; } + /** + * Sets the {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance. + * + * @param consumer the KafkaConsumer to set + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer consumer) { this.consumer = consumer; + this.managedConsumers.add(consumer); } @Override public Message receive(TestContext testContext, long timeout) { logger.debug("Receiving single message"); return KafkaMessageSingleConsumer.builder() - .consumer(consumer) + .consumer(getConsumer()) .endpointConfiguration(getEndpointConfiguration()) .build() .receive(testContext, timeout); @@ -75,7 +120,7 @@ public Message receive(TestContext testContext, long timeout) { public Message receive(String selector, TestContext testContext, long timeout) { logger.debug("Receiving selected message: {}", selector); return KafkaMessageFilteringConsumer.builder() - .consumer(consumer) + .consumer(createManagedConsumer()) .endpointConfiguration(getEndpointConfiguration()) .build() .receive(selector, testContext, timeout); @@ -90,19 +135,20 @@ protected KafkaEndpointConfiguration getEndpointConfiguration() { * Stop message listener container. */ public void stop() { - try { - if (consumer.subscription() != null && !consumer.subscription().isEmpty()) { - consumer.unsubscribe(); + org.apache.kafka.clients.consumer.KafkaConsumer consumerToDelete; + while (nonNull(consumerToDelete = managedConsumers.poll())) { + try { + consumerToDelete.unsubscribe(); + } finally { + consumerToDelete.close(); } - } finally { - consumer.close(Duration.ofMillis(10 * 1000L)); } } /** * Create new Kafka consumer with given endpoint configuration. */ - private org.apache.kafka.clients.consumer.KafkaConsumer createConsumer() { + private org.apache.kafka.clients.consumer.KafkaConsumer createKafkaConsumer() { Map consumerProps = new HashMap<>(); consumerProps.put(CLIENT_ID_CONFIG, Optional.ofNullable(getEndpointConfiguration().getClientId()).orElseGet(() -> KAFKA_PREFIX + "consumer_" + randomUUID())); consumerProps.put(GROUP_ID_CONFIG, getEndpointConfiguration().getConsumerGroup()); diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java index c960e744d7..e1998b21f7 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java @@ -65,8 +65,6 @@ public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) { } static KafkaEndpoint newKafkaEndpoint( - @Nullable org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer, - @Nullable org.apache.kafka.clients.producer.KafkaProducer kafkaProducer, @Nullable Boolean randomConsumerGroup, @Nullable String server, @Nullable Long timeout, @@ -76,7 +74,7 @@ static KafkaEndpoint newKafkaEndpoint( if (TRUE.equals(randomConsumerGroup)) { kafkaEndpoint.getEndpointConfiguration() - .setConsumerGroup(KAFKA_PREFIX + RandomStringUtils.insecure().nextAlphabetic(10).toLowerCase()); + .setConsumerGroup(KAFKA_PREFIX + RandomStringUtils.insecure().nextAlphabetic(10).toLowerCase()); } if (hasText(server)) { kafkaEndpoint.getEndpointConfiguration().setServer(server); @@ -88,6 +86,25 @@ static KafkaEndpoint newKafkaEndpoint( kafkaEndpoint.getEndpointConfiguration().setTopic(topic); } + return kafkaEndpoint; + } + + /** + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * Use {@link #newKafkaEndpoint(Boolean, String, Long, String)} instead to obtain properly managed consumer instances. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) + static KafkaEndpoint newKafkaEndpoint( + @Nullable org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer, + @Nullable org.apache.kafka.clients.producer.KafkaProducer kafkaProducer, + @Nullable Boolean randomConsumerGroup, + @Nullable String server, + @Nullable Long timeout, + @Nullable String topic + ) { + var kafkaEndpoint = newKafkaEndpoint(randomConsumerGroup, server, timeout, topic); + // Make sure these come at the end, so endpoint configuration is already initialized if (nonNull(kafkaConsumer)) { kafkaEndpoint.createConsumer().setConsumer(kafkaConsumer); @@ -159,6 +176,11 @@ public static class SimpleKafkaEndpointBuilder { private Long timeout; private String topic; + /** + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public SimpleKafkaEndpointBuilder kafkaConsumer(org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; return this; @@ -190,7 +212,7 @@ public SimpleKafkaEndpointBuilder topic(String topic) { } public KafkaEndpoint build() { - return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic); + return newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic); } } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java index c1efe6414a..cc910caa4a 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaConsumerTest.java @@ -36,10 +36,14 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -51,7 +55,7 @@ public class KafkaConsumerTest extends AbstractTestNGUnitTest { private final org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class); @Test - public void testReceiveMessage() { + public void receiveMessage() { String topic = "default"; KafkaEndpoint endpoint = KafkaEndpoint.builder() @@ -59,7 +63,7 @@ public void testReceiveMessage() { .topic(topic) .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); @@ -83,7 +87,7 @@ public void testReceiveMessage() { } @Test - public void testReceiveMessage_inRandomConsumerGroup() { + public void receiveMessage_inRandomConsumerGroup() { String topic = "default"; KafkaEndpoint endpoint = KafkaEndpoint.builder() @@ -91,7 +95,7 @@ public void testReceiveMessage_inRandomConsumerGroup() { .topic(topic) .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); @@ -115,7 +119,7 @@ public void testReceiveMessage_inRandomConsumerGroup() { } @Test - public void testReceiveMessageTimeout() { + public void receiveMessage_runIntoTimeout() { String topic = "test"; KafkaEndpoint endpoint = KafkaEndpoint.builder() @@ -140,7 +144,7 @@ public void testReceiveMessageTimeout() { } @Test - public void testWithCustomTimeout() { + public void receiveMessage_customTimeout_runIntoTimeout() { String topic = "timeout"; KafkaEndpoint endpoint = KafkaEndpoint.builder() @@ -149,7 +153,7 @@ public void testWithCustomTimeout() { .topic(topic) .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); when(kafkaConsumerMock.subscription()).thenReturn(singleton(topic)); @@ -165,7 +169,7 @@ public void testWithCustomTimeout() { } @Test - public void testWithMessageHeaders() { + public void receiveMessage_withMessageHeaders() { String topic = "headers"; KafkaEndpoint endpoint = KafkaEndpoint.builder() @@ -174,7 +178,7 @@ public void testWithMessageHeaders() { .topic(topic) .build(); - TopicPartition partition = new TopicPartition(topic, 0); + var partition = new TopicPartition(topic, 0); reset(kafkaConsumerMock); when(kafkaConsumerMock.subscription()).thenReturn(singleton(topic)); @@ -193,4 +197,87 @@ public void testWithMessageHeaders() { assertNotNull(receivedMessage.getHeader("Operation")); assertEquals(receivedMessage.getHeader("Operation"), "sayHello"); } + + @Test + public void getConsumer_returnsSetConsumer() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + var result = endpoint.createConsumer().getConsumer(); + assertThat(result) + .isEqualTo(kafkaConsumerMock); + } + + @Test + public void getConsumer_createsConsumerIfNonSet() { + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(null) // null for explicity + .build(); + + var result = endpoint.createConsumer().getConsumer(); + assertThat(result) + .isNotNull(); + } + + @Test + public void createManagedConsumer_createsDifferentManagedConsumers() { + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .build(); + + var managedConsumer1 = endpoint.createConsumer().createManagedConsumer(); + assertThat(managedConsumer1) + .isNotNull(); + + var managedConsumer2 = endpoint.createConsumer().createManagedConsumer(); + + assertThat(managedConsumer2) + .isNotNull() + .isNotEqualTo(managedConsumer1) + .isNotSameAs(managedConsumer1); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void createManagedConsumer_returnsConsumerIfOneIsSet() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + var result = endpoint.createConsumer().createManagedConsumer(); + assertThat(result) + .isEqualTo(kafkaConsumerMock); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void stop_unsubscribesAndClosesConsumer() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + endpoint.createConsumer().stop(); + verify(kafkaConsumerMock).unsubscribe(); + verify(kafkaConsumerMock).close(); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void stop_closesConsumerEvenAfterUnsubscriptionError() { + var kafkaConsumerMock = mock(KafkaConsumer.class); + var unsubscribeException = new RuntimeException(); + doThrow(unsubscribeException).when(kafkaConsumerMock).unsubscribe(); + + KafkaEndpoint endpoint = KafkaEndpoint.builder() + .kafkaConsumer(kafkaConsumerMock) + .build(); + + assertThatThrownBy(() -> endpoint.createConsumer().stop()) + .isEqualTo(unsubscribeException); + + verify(kafkaConsumerMock).close(); + } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java index dfbbd304a0..0882cd5e0e 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java @@ -122,9 +122,14 @@ public void newKafkaEndpoint_isAbleToCreateRandomConsumerGroup() { .startsWith(KAFKA_PREFIX) .hasSize(23) .containsPattern(".*[a-z]{10}$") + // Make sure the random group id is propagated to new consumers .satisfies( - // Additionally make sure that gets passed downstream groupId -> assertThat(fixture.createConsumer().getConsumer()) + .extracting("delegate") + .extracting("groupId") + .asInstanceOf(OPTIONAL) + .hasValue(groupId), + groupId -> assertThat(fixture.createConsumer().createManagedConsumer()) .extracting("delegate") .extracting("groupId") .asInstanceOf(OPTIONAL) diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java index 9cbf45ddc3..0a55dada77 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointFactoryIT.java @@ -27,4 +27,3 @@ public class KafkaEndpointFactoryIT extends TestNGCitrusSpringSupport { @CitrusTestSource(type = TestLoader.SPRING, name = "KafkaEndpointFactoryIT") public void testKafkaEndpointFactory() {} } -