Skip to content

Commit

Permalink
fix(#1281): introduce separate consumers per subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
bbortt committed Dec 17, 2024
1 parent 36a0f74 commit 5c41054
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;

import static java.util.Objects.nonNull;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
Expand All @@ -44,19 +45,53 @@ public class KafkaConsumer extends AbstractSelectiveMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;
private final ConcurrentLinkedQueue<org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object>> consumers = new ConcurrentLinkedQueue<>();

/**
* Default constructor using endpoint.
*/
public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) {
super(name, endpointConfiguration);
this.consumer = createConsumer();
this.consumer = createManagedConsumer();
}

/**
* Initializes and provides a new {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance in a thread-safe manner.
* This method is the preferred way to obtain a consumer instance as it ensures proper lifecycle management and thread-safety.
* <p>
* The created consumer is automatically registered for lifecycle management and cleanup.
* Each call to this method creates a new consumer instance.
*
* @return a new thread-safe {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance
*/
public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createManagedConsumer() {
var consumer = createKafkaConsumer();
consumers.add(consumer);
return consumer;
}

/**
* Returns the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance.
*
* @return the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> getConsumer() {
return consumer;
}

/**
* Sets the {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance.
*
* @param consumer the KafkaConsumer to set
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer) {
this.consumer = consumer;
}
Expand All @@ -65,7 +100,7 @@ public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object,
public Message receive(TestContext testContext, long timeout) {
logger.debug("Receiving single message");
return KafkaMessageSingleConsumer.builder()
.consumer(consumer)
.consumer(createManagedConsumer())
.endpointConfiguration(getEndpointConfiguration())
.build()
.receive(testContext, timeout);
Expand All @@ -75,7 +110,7 @@ public Message receive(TestContext testContext, long timeout) {
public Message receive(String selector, TestContext testContext, long timeout) {
logger.debug("Receiving selected message: {}", selector);
return KafkaMessageFilteringConsumer.builder()
.consumer(consumer)
.consumer(createManagedConsumer())
.endpointConfiguration(getEndpointConfiguration())
.build()
.receive(selector, testContext, timeout);
Expand All @@ -90,19 +125,20 @@ protected KafkaEndpointConfiguration getEndpointConfiguration() {
* Stop message listener container.
*/
public void stop() {
try {
if (consumer.subscription() != null && !consumer.subscription().isEmpty()) {
consumer.unsubscribe();
org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumerToDelete;
while (nonNull(consumerToDelete = consumers.poll())) {
try {
consumerToDelete.unsubscribe();
} finally {
consumerToDelete.close();
}
} finally {
consumer.close(Duration.ofMillis(10 * 1000L));
}
}

/**
* Create new Kafka consumer with given endpoint configuration.
*/
private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createConsumer() {
private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createKafkaConsumer() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(CLIENT_ID_CONFIG, Optional.ofNullable(getEndpointConfiguration().getClientId()).orElseGet(() -> KAFKA_PREFIX + "consumer_" + randomUUID()));
consumerProps.put(GROUP_ID_CONFIG, getEndpointConfiguration().getConsumerGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public SimpleKafkaEndpointBuilder topic(String topic) {
}

public KafkaEndpoint build() {
return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic);
return newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,14 @@ public void newKafkaEndpoint_isAbleToCreateRandomConsumerGroup() {
.startsWith(KAFKA_PREFIX)
.hasSize(23)
.containsPattern(".*[a-z]{10}$")
// Make sure the random group id is propagated to new consumers
.satisfies(
// Additionally make sure that gets passed downstream
groupId -> assertThat(fixture.createConsumer().getConsumer())
.extracting("delegate")
.extracting("groupId")
.asInstanceOf(OPTIONAL)
.hasValue(groupId),
groupId -> assertThat(fixture.createConsumer().createManagedConsumer())
.extracting("delegate")
.extracting("groupId")
.asInstanceOf(OPTIONAL)
Expand Down

0 comments on commit 5c41054

Please sign in to comment.