-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka. timeout #1250
Comments
is it possible that both your tests use the same |
@bbortt maybe, how set selector by message body for Kafka? jsonPath works? .selector(Collections.singletonMap("jsonPath:$.data.commandId", id)) |
@akuz0 no, I've focused on header-based filtering for now. I can take this as a feature request, if you need it? I think JSON-path-based value (message body) matching sounds reasonable. |
Citrus Version
4.3.3
Question
test case
Why can't I subtract events? And why poll it is called once?
Maybe I'm doing something wrong.
What I've tried so far
If you run a group of tests, all the tests after the first one fall
I think, problem
Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted.
log
2024-11-02 20:49:31 DEBUG AbstractFetch:194 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Fetch read_uncommitted at offset 114 for partition ePC-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=116, lastStableOffset=116, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=661, buffer=java.nio.HeapByteBuffer[pos=0 lim=661 cap=664])) 2024-11-02 20:49:31 DEBUG AbstractFetch:278 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Removing pending request for fetch session: 613051 for node: IP:9092 (id: 1 rack: null) 2024-11-02 20:49:31 DEBUG NetworkClient:954 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Received OFFSET_COMMIT response from node 2147483646 for request with header RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=8, clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, correlationId=16, headerVersion=2): OffsetCommitResponseData(throttleTimeMs=0, topics=[OffsetCommitResponseTopic(name='ePC', partitions=[OffsetCommitResponsePartition(partitionIndex=0, errorCode=0)])]) 2024-11-02 20:51:28 DEBUG ConsumerCoordinator:1326 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Committed offset 114 for partition -0 2024-11-02 20:51:28 INFO ConsumerCoordinator:999 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Group coordinator MY_KAFKA_IP (id: 2147483646 rack: null) is unavailable or invalid due to cause: session timed out without receiving a heartbeat response. isDisconnected: false. Rediscovery will be attempted. 2024-11-02 20:51:28 INFO ConsumerCoordinator:1012 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Requesting disconnect from last known coordinator IP:9092 (id: 2147483646 rack: null) 2024-11-02 20:51:28 INFO NetworkClient:343 - [Consumer clientId=citrus_kafka_consumer_008e7fa5-49f5-422d-9ef2-011d009f913a, groupId=citrus_kafka_group] Client requested disconnect from node 2147483646
Additional information
If in debug mode, when citrus send throw, i use poll and see my event. Why poll it is called once?
for example while
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
I can use while in code, but i have timeout exception instante
The text was updated successfully, but these errors were encountered: