Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaMessageFilteringConsumer#findMessageWithTimeout concurrent access #1281

Open
tschlat opened this issue Dec 12, 2024 · 2 comments · May be fixed by #1282
Open

KafkaMessageFilteringConsumer#findMessageWithTimeout concurrent access #1281

tschlat opened this issue Dec 12, 2024 · 2 comments · May be fixed by #1282
Assignees
Labels
Prio: High State: Review If pull-request has been opened an is ready/in review Type: Bug

Comments

@tschlat
Copy link
Collaborator

tschlat commented Dec 12, 2024

Citrus Version
4.4

Expected behavior
KafkaMessageFilteringConsumer should poll messages without running into exceptions.

Actual behavior
Consumer runs into the following exception:

org.citrusframework.exceptions.TestCaseFailedException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: main, id: 1) otherThread(id: 50)

The Reason is, that the conusmer is queried asynchronously here:
KafkaMessageFilteringConsumer#findMessageWithTimeout:

Future<List<ConsumerRecord<Object, Object>>> handler = executorService.submit(() -> {
    return this.findMessagesSatisfyingMatcher(topic);
});

and then here from the main thread:

        } finally {
            executorService.shutdownNow();
            this.consumer.unsubscribe();
        }

When the consumber is unsubscribed, it is still locked by the asynchonous thread. So before unsubscription, we need a sync here to make sure the consumer is not in use any more.

Test case sample

Please, share the test case (as small as possible) which shows the issue

@bbortt
Copy link
Collaborator

bbortt commented Dec 15, 2024

thanks @tschlat. I will fix it by next week.

bbortt added a commit to postfinance/citrus that referenced this issue Dec 16, 2024
…a consumer

added a reproducer for citrusframework#1281.
indeed the test which is part of the `KafkaEndpointJavaIT` throws:

```shell
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125)
```
bbortt added a commit to postfinance/citrus that referenced this issue Dec 16, 2024
…a consumer

added a reproducer for citrusframework#1281.
indeed the test which is part of the `KafkaEndpointJavaIT` throws:

```shell
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125)
```
@bbortt
Copy link
Collaborator

bbortt commented Dec 17, 2024

Just wanted to note this down here...

When the consumber is unsubscribed, it is still locked by the asynchonous thread. So before unsubscription, we need a sync here to make sure the consumer is not in use any more.

That's not the source of the problem (although the order of shutdown might be wrong in this case). There are numerous threads and only one consumer which is not thread-safe. I've fixed this in the KafkaConsumer class of citrus. But I need some more minutes to double-check backwards compatibility.

bbortt added a commit to postfinance/citrus that referenced this issue Dec 17, 2024
…a consumer

added a reproducer for citrusframework#1281.
indeed the test which is part of the `KafkaEndpointJavaIT` throws:

```shell
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125)
```
@bbortt bbortt linked a pull request Dec 17, 2024 that will close this issue
bbortt added a commit to postfinance/citrus that referenced this issue Dec 17, 2024
…a consumer

added a reproducer for citrusframework#1281.
indeed the test which is part of the `KafkaEndpointJavaIT` throws:

```shell
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: Thread-1, id: 124) otherThread(id: 125)
```
@bbortt bbortt added Type: Bug Prio: High State: Review If pull-request has been opened an is ready/in review labels Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Prio: High State: Review If pull-request has been opened an is ready/in review Type: Bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants