Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka concurrent access #1282

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.citrusframework.internal;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.SOURCE;

/**
* Annotation to reference a GitHub issue number associated with a test class or method.
* It serves as a linking mechanism between reproducers (tests proving certain behavior) and their corresponding GitHub issues.
* <p>
* Example usage:
* <pre>
* {@code @GitHubIssue(1234)
* public class MyTest {
* // Class implementation
* }
*
* {@code @GitHubIssue(5678)
* public void testMethod() {
* // Method implementation
* }}
* </pre>
*/
@Retention(SOURCE)
@Target({METHOD, TYPE})
public @interface GitHubIssue {

/**
* The GitHub issue number to reference.
*
* @return the issue number in <a href="https://github.com/citrusframework/citrus/issues">the GitHub repository</a>
*/
int value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.UUID.randomUUID;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
Expand All @@ -44,28 +46,71 @@ public class KafkaConsumer extends AbstractSelectiveMessageConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer;
private final ConcurrentLinkedQueue<org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object>> managedConsumers = new ConcurrentLinkedQueue<>();

/**
* Default constructor using endpoint.
*/
public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) {
super(name, endpointConfiguration);
this.consumer = createConsumer();
}

/**
* Initializes and provides a new {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance in a thread-safe manner.
* This method is the preferred way to obtain a consumer instance as it ensures proper lifecycle management and thread-safety.
* <p>
* The created consumer is automatically registered for lifecycle management and cleanup.
* Each call to this method creates a new consumer instance.
*
* @return a new thread-safe {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance
*/
public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createManagedConsumer() {
if (nonNull(consumer)) {
return consumer;
}

var managedConsumer = createKafkaConsumer();
managedConsumers.add(managedConsumer);
return managedConsumer;
}

/**
* Returns the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance.
* If non exists, a managed one is being created (see {@link #createManagedConsumer()}).
*
* @return the current {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> getConsumer() {
if (isNull(consumer)) {
consumer = createKafkaConsumer();
}

return consumer;
}

/**
* Sets the {@link org.apache.kafka.clients.consumer.KafkaConsumer} instance.
*
* @param consumer the KafkaConsumer to set
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* Use {@link #createManagedConsumer()} instead to obtain properly managed consumer instances.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer) {
this.consumer = consumer;
this.managedConsumers.add(consumer);
}

@Override
public Message receive(TestContext testContext, long timeout) {
logger.debug("Receiving single message");
return KafkaMessageSingleConsumer.builder()
.consumer(consumer)
.consumer(getConsumer())
.endpointConfiguration(getEndpointConfiguration())
.build()
.receive(testContext, timeout);
Expand All @@ -75,7 +120,7 @@ public Message receive(TestContext testContext, long timeout) {
public Message receive(String selector, TestContext testContext, long timeout) {
logger.debug("Receiving selected message: {}", selector);
return KafkaMessageFilteringConsumer.builder()
.consumer(consumer)
.consumer(createManagedConsumer())
.endpointConfiguration(getEndpointConfiguration())
.build()
.receive(selector, testContext, timeout);
Expand All @@ -90,19 +135,20 @@ protected KafkaEndpointConfiguration getEndpointConfiguration() {
* Stop message listener container.
*/
public void stop() {
try {
if (consumer.subscription() != null && !consumer.subscription().isEmpty()) {
consumer.unsubscribe();
org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumerToDelete;
while (nonNull(consumerToDelete = managedConsumers.poll())) {
try {
consumerToDelete.unsubscribe();
} finally {
consumerToDelete.close();
}
} finally {
consumer.close(Duration.ofMillis(10 * 1000L));
}
}

/**
* Create new Kafka consumer with given endpoint configuration.
*/
private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createConsumer() {
private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createKafkaConsumer() {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(CLIENT_ID_CONFIG, Optional.ofNullable(getEndpointConfiguration().getClientId()).orElseGet(() -> KAFKA_PREFIX + "consumer_" + randomUUID()));
consumerProps.put(GROUP_ID_CONFIG, getEndpointConfiguration().getConsumerGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) {
}

static KafkaEndpoint newKafkaEndpoint(
@Nullable org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer,
@Nullable org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer,
@Nullable Boolean randomConsumerGroup,
@Nullable String server,
@Nullable Long timeout,
Expand All @@ -76,7 +74,7 @@ static KafkaEndpoint newKafkaEndpoint(

if (TRUE.equals(randomConsumerGroup)) {
kafkaEndpoint.getEndpointConfiguration()
.setConsumerGroup(KAFKA_PREFIX + RandomStringUtils.insecure().nextAlphabetic(10).toLowerCase());
.setConsumerGroup(KAFKA_PREFIX + RandomStringUtils.insecure().nextAlphabetic(10).toLowerCase());
}
if (hasText(server)) {
kafkaEndpoint.getEndpointConfiguration().setServer(server);
Expand All @@ -88,6 +86,25 @@ static KafkaEndpoint newKafkaEndpoint(
kafkaEndpoint.getEndpointConfiguration().setTopic(topic);
}

return kafkaEndpoint;
}

/**
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* Use {@link #newKafkaEndpoint(Boolean, String, Long, String)} instead to obtain properly managed consumer instances.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
static KafkaEndpoint newKafkaEndpoint(
@Nullable org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer,
@Nullable org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer,
@Nullable Boolean randomConsumerGroup,
@Nullable String server,
@Nullable Long timeout,
@Nullable String topic
) {
var kafkaEndpoint = newKafkaEndpoint(randomConsumerGroup, server, timeout, topic);

// Make sure these come at the end, so endpoint configuration is already initialized
if (nonNull(kafkaConsumer)) {
kafkaEndpoint.createConsumer().setConsumer(kafkaConsumer);
Expand Down Expand Up @@ -159,6 +176,11 @@ public static class SimpleKafkaEndpointBuilder {
private Long timeout;
private String topic;

/**
* @deprecated {@link org.apache.kafka.clients.consumer.KafkaConsumer} is <b>not</b> thread-safe and manual consumer management is error-prone.
* This method will be removed in a future release.
*/
@Deprecated(forRemoval = true)
public SimpleKafkaEndpointBuilder kafkaConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
return this;
Expand Down Expand Up @@ -190,7 +212,7 @@ public SimpleKafkaEndpointBuilder topic(String topic) {
}

public KafkaEndpoint build() {
return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic);
return newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.time.Instant.now;
import static java.util.Collections.singletonList;
import static java.util.Objects.isNull;
Expand Down Expand Up @@ -140,13 +140,13 @@ public Message receive(String selector, TestContext testContext, long timeout) {
private List<ConsumerRecord<Object, Object>> findMessageWithTimeout(String topic, long timeout) {
logger.trace("Applied timeout is {} ms", timeout);

ExecutorService executorService = newSingleThreadExecutor();
var executorService = newSingleThreadExecutor();
final Future<List<ConsumerRecord<Object, Object>>> handler = executorService.submit(() -> findMessagesSatisfyingMatcher(topic));

try {
return handler.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
currentThread().interrupt();
throw new CitrusRuntimeException("Thread was interrupted while waiting for Kafka message", e);
} catch (ExecutionException e) {
throw new CitrusRuntimeException(format("Failed to receive message on Kafka topic '%s'", topic), e);
Expand All @@ -157,8 +157,8 @@ private List<ConsumerRecord<Object, Object>> findMessageWithTimeout(String topic

throw new MessageTimeoutException(timeout, topic, e);
} finally {
executorService.shutdownNow();
consumer.unsubscribe();
executorService.shutdownNow();
}
}

Expand Down
Loading
Loading