From bf576e0cac4855cea18951d58c69424d05403df5 Mon Sep 17 00:00:00 2001 From: Timon Borter Date: Mon, 16 Dec 2024 18:51:40 +0100 Subject: [PATCH] chore(#1281): reproducer for parallel access into kafka consumer added a reproducer for https://github.com/citrusframework/citrus/issues/1281. indeed the test which is part of the `KafkaEndpointJavaIT` throws: ```shell java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125) ``` --- .../citrusframework/internal/GitHubIssue.java | 15 +++++++++ .../KafkaMessageFilteringConsumer.java | 8 ++--- .../integration/KafkaEndpointJavaIT.java | 33 +++++++++++++++++++ 3 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 core/citrus-base/src/main/java/org/citrusframework/internal/GitHubIssue.java diff --git a/core/citrus-base/src/main/java/org/citrusframework/internal/GitHubIssue.java b/core/citrus-base/src/main/java/org/citrusframework/internal/GitHubIssue.java new file mode 100644 index 0000000000..aa3f92d592 --- /dev/null +++ b/core/citrus-base/src/main/java/org/citrusframework/internal/GitHubIssue.java @@ -0,0 +1,15 @@ +package org.citrusframework.internal; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.SOURCE; + +@Retention(SOURCE) +@Target({METHOD, TYPE}) +public @interface GitHubIssue { + + int value(); +} diff --git a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java index d1d4aabf68..0200dfa1fe 100644 --- a/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java +++ b/endpoints/citrus-kafka/src/main/java/org/citrusframework/kafka/endpoint/KafkaMessageFilteringConsumer.java @@ -34,13 +34,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import static java.lang.String.format; +import static java.lang.Thread.currentThread; import static java.time.Instant.now; import static java.util.Collections.singletonList; import static java.util.Objects.isNull; @@ -140,13 +140,13 @@ public Message receive(String selector, TestContext testContext, long timeout) { private List> findMessageWithTimeout(String topic, long timeout) { logger.trace("Applied timeout is {} ms", timeout); - ExecutorService executorService = newSingleThreadExecutor(); + var executorService = newSingleThreadExecutor(); final Future>> handler = executorService.submit(() -> findMessagesSatisfyingMatcher(topic)); try { return handler.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + currentThread().interrupt(); throw new CitrusRuntimeException("Thread was interrupted while waiting for Kafka message", e); } catch (ExecutionException e) { throw new CitrusRuntimeException(format("Failed to receive message on Kafka topic '%s'", topic), e); @@ -157,8 +157,8 @@ private List> findMessageWithTimeout(String topic throw new MessageTimeoutException(timeout, topic, e); } finally { - executorService.shutdownNow(); consumer.unsubscribe(); + executorService.shutdownNow(); } } diff --git a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java index 83e448ef61..3b87f04284 100644 --- a/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java +++ b/endpoints/citrus-kafka/src/test/java/org/citrusframework/kafka/integration/KafkaEndpointJavaIT.java @@ -20,6 +20,7 @@ import org.citrusframework.annotations.CitrusTest; import org.citrusframework.exceptions.CitrusRuntimeException; import org.citrusframework.exceptions.TestCaseFailedException; +import org.citrusframework.internal.GitHubIssue; import org.citrusframework.kafka.endpoint.KafkaEndpoint; import org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector; import org.citrusframework.kafka.message.KafkaMessage; @@ -33,6 +34,7 @@ import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive; import static org.citrusframework.actions.SendMessageAction.Builder.send; import static org.citrusframework.actions.SleepAction.Builder.sleep; +import static org.citrusframework.container.Parallel.Builder.parallel; import static org.citrusframework.kafka.endpoint.KafkaMessageFilter.kafkaMessageFilter; import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.ENDS_WITH; import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.ValueMatchingStrategy.STARTS_WITH; @@ -274,4 +276,35 @@ public void findKafkaEvent_headerEquals_java_DSL() { ); } + @CitrusTest + @GitHubIssue(1281) + public void parallel_access_thread_safety() { + var body = "parallel_access_thread_safety"; + + var key = "Name"; + + var brother1 = "Elladan"; + var brother2 = "Elrohir"; + + when( + send(kafkaWithRandomConsumerGroupEndpoint) + .message(new KafkaMessage(body).setHeader(key, brother1)) + ); + + when( + send(kafkaWithRandomConsumerGroupEndpoint) + .message(new KafkaMessage(body).setHeader(key, brother2)) + ); + + + then( + parallel() + .actions( + kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother1) + .body(body), + kafkaWithRandomConsumerGroupEndpoint.findKafkaEventHeaderEquals(Duration.ofSeconds(1L), key, brother2) + .body(body) + ) + ); + } }