Skip to content

Commit

Permalink
fix#1065
Browse files Browse the repository at this point in the history
fter few days of experiments found the root cause.

Closes confluentinc#1065

Few details on the fix:
KafkaConsumerManager's "Consumer Expiration Thread" acquires a lock on (KafkaConsumerManager.this instance) and calls expired method of KafkaConsumerState.java. The Consumer Expiration Thread sometimes takes time to get the lock on synchronized expired because there can be a consumer consuming records that has a lock on the KafkaConsumerState instance which may take longer to release depending on the time taken in the IO to fetch the records from kafka broker. The situation can get worse with more number of active consumers fetching records.

The two locks for expired and updateExpiration are not required.

Thread dump snippets
"qtp1551629761-29" prio=5 Id=29 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:669)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.getConsumerInstance(KafkaConsumerManager.java:679)

"qtp1551629761-35" prio=5 Id=35 BLOCKED on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 owned by "Consumer Expiration Thread" Id=47
 at app//io.confluent.kafkarest.v2.KafkaConsumerManager.createConsumer(KafkaConsumerManager.java:180)
 -  blocked on io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4 

"Consumer Expiration Thread" daemon prio=5 Id=47 BLOCKED on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda owned by "pool-4-thread-13" Id=153
 at app//io.confluent.kafkarest.v2.KafkaConsumerState.expired(KafkaConsumerState.java:345)
 -  blocked on io.confluent.kafkarest.v2.JsonKafkaConsumerState@4d2bbeda at app//io.confluent.kafkarest.v2.KafkaConsumerManager$ExpirationThread.run(KafkaConsumerManager.java:761)
 -  locked io.confluent.kafkarest.v2.KafkaConsumerManager@3a09eed4

"pool-4-thread-13" prio=5 Id=153 RUNNABLE (in native)
 at [email protected]/sun.nio.ch.EPoll.wait(Native Method)
 at [email protected]/sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:120)
 at [email protected]/sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:124)
 -  locked sun.nio.ch.Util$2@1458190c
 -  locked sun.nio.ch.EPollSelectorImpl@3ebca1e6
 at [email protected]/sun.nio.ch.SelectorImpl.select(SelectorImpl.java:136)
 at app//org.apache.kafka.common.network.Selector.select(Selector.java:869)
 at app//org.apache.kafka.common.network.Selector.poll(Selector.java:465)
 at app//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
 at app//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
  • Loading branch information
swapnilgawade16 authored Sep 23, 2022
1 parent 17c40d8 commit dfa1373
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,13 @@ synchronized Optional<Long> getOffsetForTime(String topic, int partition, Instan
.map(OffsetAndTimestamp::offset);
}

public synchronized boolean expired(Instant now) {
public boolean expired(Instant now) {
synchronized (this.expirationLock) {
return !expiration.isAfter(now);
}
}

public synchronized void updateExpiration() {
public void updateExpiration() {
synchronized (this.expirationLock) {
this.expiration = clock.instant().plus(consumerInstanceTimeout);
}
Expand Down

0 comments on commit dfa1373

Please sign in to comment.