Skip to content

Commit

Permalink
KAFKA-17954: Error getting oldest-iterator-open-since-ms from JMX (#1…
Browse files Browse the repository at this point in the history
…7713)

The thread that evaluates the gauge for the oldest-iterator-open-since-ms runs concurrently
with threads that open/close iterators (stream threads and interactive query threads). This PR
fixed a race condition between `openIterators.isEmpty()` and `openIterators.first()`, by catching
a potential exception. Because we except the race condition to be rare, we rather catch the
exception in favor of introducing a guard via locking.

Reviewers: Matthias J. Sax <[email protected]>, Anna Sophie Blee-Goldman <[email protected]>
  • Loading branch information
nicktelford authored Nov 19, 2024
1 parent 8f63a77 commit 57299cf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -154,7 +155,13 @@ private void registerMetrics() {
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
(config, now) -> {
try {
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
} catch (final NoSuchElementException ignored) {
return null;
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -124,7 +125,13 @@ private void registerMetrics() {
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
(config, now) -> {
try {
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
} catch (final NoSuchElementException ignored) {
return null;
}
}
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -142,7 +143,13 @@ private void registerMetrics() {
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
(config, now) -> {
try {
return openIterators.isEmpty() ? null : openIterators.first().startTimestamp();
} catch (final NoSuchElementException ignored) {
return null;
}
}
);
}

Expand Down

0 comments on commit 57299cf

Please sign in to comment.