diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java index b84ddd456e..d32259236b 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaConsumer.java @@ -22,11 +22,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import static java.util.Objects.nonNull; import static java.util.UUID.randomUUID; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; @@ -44,19 +45,53 @@ public class KafkaConsumer extends AbstractSelectiveMessageConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); private org.apache.kafka.clients.consumer.KafkaConsumer consumer; + private final ConcurrentLinkedQueue> consumers = new ConcurrentLinkedQueue<>(); /** * Default constructor using endpoint. */ public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) { super(name, endpointConfiguration); - this.consumer = createConsumer(); + this.consumer = createManagedConsumer(); } + /** + * Initializes and provides a new {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance in a thread-safe manner. + * This method is the preferred way to obtain a consumer instance as it ensures proper lifecycle management and thread-safety. + *

+ * The created consumer is automatically registered for lifecycle management and cleanup. + * Each call to this method creates a new consumer instance. + * + * @return a new thread-safe {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance + */ + public org.apache.kafka.clients.consumer.KafkaConsumer createManagedConsumer() { + var consumer = createKafkaConsumer(); + consumers.add(consumer); + return consumer; + } + + /** + * Returns the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance. + * + * @return the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public org.apache.kafka.clients.consumer.KafkaConsumer getConsumer() { return consumer; } + /** + * Sets the {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance. + * + * @param consumer the KafkaConsumer to set + * @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is not thread-safe and manual consumer management is error-prone. + * Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances. + * This method will be removed in a future release. + */ + @Deprecated(forRemoval = true) public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer consumer) { this.consumer = consumer; } @@ -65,7 +100,7 @@ public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer consumerToDelete; + while (nonNull(consumerToDelete = consumers.poll())) { + try { + consumerToDelete.unsubscribe(); + } finally { + consumerToDelete.close(); } - } finally { - consumer.close(Duration.ofMillis(10 * 1000L)); } } /** * Create new Kafka consumer with given endpoint configuration. */ - private org.apache.kafka.clients.consumer.KafkaConsumer createConsumer() { + private org.apache.kafka.clients.consumer.KafkaConsumer createKafkaConsumer() { Map consumerProps = new HashMap<>(); consumerProps.put(CLIENT_ID_CONFIG, Optional.ofNullable(getEndpointConfiguration().getClientId()).orElseGet(() -> KAFKA_PREFIX + "consumer_" + randomUUID())); consumerProps.put(GROUP_ID_CONFIG, getEndpointConfiguration().getConsumerGroup()); diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java index c960e744d7..56118d2b9e 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaEndpoint.java @@ -190,7 +190,7 @@ public SimpleKafkaEndpointBuilder topic(String topic) { } public KafkaEndpoint build() { - return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic); + return newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic); } } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java index dfbbd304a0..0882cd5e0e 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/endpoint/KafkaEndpointTest.java @@ -122,9 +122,14 @@ public void newKafkaEndpoint_isAbleToCreateRandomConsumerGroup() { .startsWith(KAFKA_PREFIX) .hasSize(23) .containsPattern(".*[a-z]{10}$") + // Make sure the random group id is propagated to new consumers .satisfies( - // Additionally make sure that gets passed downstream groupId -> assertThat(fixture.createConsumer().getConsumer()) + .extracting("delegate") + .extracting("groupId") + .asInstanceOf(OPTIONAL) + .hasValue(groupId), + groupId -> assertThat(fixture.createConsumer().createManagedConsumer()) .extracting("delegate") .extracting("groupId") .asInstanceOf(OPTIONAL)