From fc720d33a03e5b40a31b82c4baddb8a23b6e8f9d Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Sat, 31 Aug 2024 05:02:06 -0700 Subject: [PATCH] MINOR: remove get prefix for internal state methods (#17053) Reviewers: Chia-Ping Tsai --- .../kstream/internals/KStreamKStreamJoin.java | 6 +-- .../internals/KStreamKStreamJoinLeftSide.java | 4 +- .../KStreamKStreamJoinRightSide.java | 4 +- .../internals/AbstractReadWriteDecorator.java | 2 +- .../internals/DefaultStateUpdater.java | 4 +- .../internals/GlobalProcessorContextImpl.java | 6 +-- .../internals/ProcessorContextImpl.java | 4 +- ...tDualSchemaRocksDBSegmentedBytesStore.java | 4 +- .../AbstractRocksDBSegmentedBytesStore.java | 6 +-- .../state/internals/AbstractSegments.java | 2 +- .../state/internals/LeftOrRightValue.java | 4 +- .../internals/LeftOrRightValueSerializer.java | 8 ++-- .../internals/LogicalKeyValueSegment.java | 14 +++--- .../internals/LogicalKeyValueSegments.java | 2 +- .../internals/LogicalSegmentIterator.java | 6 +-- .../state/internals/MeteredKeyValueStore.java | 5 +- .../state/internals/MeteredSessionStore.java | 2 +- .../MeteredTimestampedKeyValueStore.java | 9 ++-- .../MeteredVersionedKeyValueStore.java | 2 +- .../state/internals/MeteredWindowStore.java | 5 +- .../streams/state/internals/RocksDBStore.java | 2 +- .../internals/RocksDBVersionedStore.java | 48 +++++++++---------- ...cksDBVersionedStoreRestoreWriteBuffer.java | 10 ++-- ...DBVersionedStoreSegmentValueFormatter.java | 8 ++-- .../streams/state/internals/Segments.java | 2 +- .../state/internals/StoreQueryUtils.java | 4 +- .../internals/TimestampedKeyAndJoinSide.java | 4 +- .../TimestampedKeyAndJoinSideSerializer.java | 4 +- .../internals/StreamsMetadataStateTest.java | 1 - .../state/internals/KeyValueSegmentsTest.java | 14 +++--- .../LogicalKeyValueSegmentsTest.java | 10 ++-- ...rsionedStoreSegmentValueFormatterTest.java | 4 +- .../internals/TimestampedSegmentsTest.java | 14 +++--- 33 files changed, 110 insertions(+), 114 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 745f7038257ac..9ba8f316271dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -207,7 +207,7 @@ private void emitNonJoinedOuterRecords(final KeyValueStore, LeftOrRightValue> nextKeyValue = it.next(); final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = nextKeyValue.key; - sharedTimeTracker.minTime = timestampedKeyAndJoinSide.getTimestamp(); + sharedTimeTracker.minTime = timestampedKeyAndJoinSide.timestamp(); if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) { // if windows are open for both joinSides we can break since there are no more candidates to emit break; @@ -250,8 +250,8 @@ private void emitNonJoinedOuterRecords(final KeyValueStore record, final TimestampedKeyAndJoinSide timestampedKeyAndJoinSide, final LeftOrRightValue leftOrRightValue) { - final K key = timestampedKeyAndJoinSide.getKey(); - final long timestamp = timestampedKeyAndJoinSide.getTimestamp(); + final K key = timestampedKeyAndJoinSide.key(); + final long timestamp = timestampedKeyAndJoinSide.timestamp(); final VThis thisValue = getThisValue(leftOrRightValue); final VOther otherValue = getOtherValue(leftOrRightValue); final VOut nullJoinedValue = joiner.apply(key, thisValue, otherValue); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java index 3b4ee5b33930a..df03e39b8c581 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinLeftSide.java @@ -60,12 +60,12 @@ public TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timesta @Override public VLeft getThisValue(final LeftOrRightValue leftOrRightValue) { - return leftOrRightValue.getLeftValue(); + return leftOrRightValue.leftValue(); } @Override public VRight getOtherValue(final LeftOrRightValue leftOrRightValue) { - return leftOrRightValue.getRightValue(); + return leftOrRightValue.rightValue(); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java index e4bcfe4e10517..ec29d5f12b820 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinRightSide.java @@ -59,12 +59,12 @@ public TimestampedKeyAndJoinSide makeOtherKey(final K key, final long timesta @Override public VRight getThisValue(final LeftOrRightValue leftOrRightValue) { - return leftOrRightValue.getRightValue(); + return leftOrRightValue.rightValue(); } @Override public VLeft getOtherValue(final LeftOrRightValue leftOrRightValue) { - return leftOrRightValue.getLeftValue(); + return leftOrRightValue.leftValue(); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java index 11433af42cfbf..353c26ec25389 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java @@ -53,7 +53,7 @@ public void close() { throw new UnsupportedOperationException(ERROR_MESSAGE); } - static StateStore readWriteStore(final StateStore store) { + static StateStore wrapWithReadWriteStore(final StateStore store) { if (store instanceof TimestampedKeyValueStore) { return new TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore) store); } else if (store instanceof VersionedKeyValueStore) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 2e4bb85db7cbd..addef5a9f1565 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -200,7 +200,7 @@ private void runOnce() { private void performActionsOnTasks() { tasksAndActionsLock.lock(); try { - for (final TaskAndAction taskAndAction : getTasksAndActions()) { + for (final TaskAndAction taskAndAction : tasksAndActions()) { final Action action = taskAndAction.action(); switch (action) { case ADD: @@ -458,7 +458,7 @@ private void clearUpdatingAndPausedTasks() { changelogReader.clear(); } - private List getTasksAndActions() { + private List tasksAndActions() { final List tasksAndActionsToProcess = new ArrayList<>(tasksAndActions); tasksAndActions.clear(); return tasksAndActionsToProcess; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 74148cd21b185..828ae3a0a7968 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -34,7 +34,7 @@ import java.time.Duration; -import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore; +import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore; public class GlobalProcessorContextImpl extends AbstractProcessorContext { @@ -60,7 +60,7 @@ protected StateManager stateManager() { @Override public S getStateStore(final String name) { final StateStore store = stateManager.globalStore(name); - return (S) readWriteStore(store); + return (S) wrapWithReadWriteStore(store); } @SuppressWarnings("unchecked") @@ -156,4 +156,4 @@ public void transitionToStandby(final ThreadCache newCache) { public void registerCacheFlushListener(final String namespace, final DirtyEntryFlushListener listener) { cache.addDirtyEntryFlushListener(namespace, listener); } -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 91ee78b124637..dc946ba3b6c2b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -47,7 +47,7 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; import static org.apache.kafka.streams.processor.internals.AbstractReadOnlyDecorator.getReadOnlyStore; -import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.readWriteStore; +import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.wrapWithReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { // the below are null for standby tasks @@ -182,7 +182,7 @@ public S getStateStore(final String name) { } final StateStore store = stateManager.store(name); - return (S) readWriteStore(store); + return (S) wrapWithReadWriteStore(store); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java index f2d789be2b18f..e55d8452fae6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java @@ -113,7 +113,7 @@ public KeyValueIterator backwardAll() { public void remove(final Bytes rawBaseKey) { final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey); observedStreamTime = Math.max(observedStreamTime, timestamp); - final S segment = segments.getSegmentForTimestamp(timestamp); + final S segment = segments.segmentForTimestamp(timestamp); if (segment == null) { return; } @@ -227,7 +227,7 @@ public byte[] get(final Bytes rawKey) { } } - final S segment = segments.getSegmentForTimestamp(timestampFromRawKey); + final S segment = segments.segmentForTimestamp(timestampFromRawKey); if (segment == null) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java index 9aee1de871e85..f5b4366ae9839 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java @@ -239,7 +239,7 @@ public KeyValueIterator backwardFetchAll(final long timeFrom, public void remove(final Bytes key) { final long timestamp = keySchema.segmentTimestamp(key); observedStreamTime = Math.max(observedStreamTime, timestamp); - final S segment = segments.getSegmentForTimestamp(timestamp); + final S segment = segments.segmentForTimestamp(timestamp); if (segment == null) { return; } @@ -249,7 +249,7 @@ public void remove(final Bytes key) { @Override public void remove(final Bytes key, final long timestamp) { final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp); - final S segment = segments.getSegmentForTimestamp(timestamp); + final S segment = segments.segmentForTimestamp(timestamp); if (segment != null) { segment.deleteRange(keyBytes, keyBytes); } @@ -281,7 +281,7 @@ public byte[] get(final Bytes key) { key.toString(), timestampFromKey, observedStreamTime - retentionPeriod + 1); return null; } - final S segment = segments.getSegmentForTimestamp(timestampFromKey); + final S segment = segments.segmentForTimestamp(timestampFromKey); if (segment == null) { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 24020e9d2c532..5611fe99d24e5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -75,7 +75,7 @@ public String segmentName(final long segmentId) { } @Override - public S getSegmentForTimestamp(final long timestamp) { + public S segmentForTimestamp(final long timestamp) { return segments.get(segmentId(timestamp)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java index 5f5e0ab7dd717..869a524bee2e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java @@ -63,11 +63,11 @@ public static LeftOrRightValue makeRightValue(final V2 rightVal return new LeftOrRightValue<>(null, rightValue); } - public V1 getLeftValue() { + public V1 leftValue() { return leftValue; } - public V2 getRightValue() { + public V2 rightValue() { return rightValue; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java index 8f3c47d14d1a1..1c64c29fd5ac8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerializer.java @@ -65,9 +65,9 @@ public byte[] serialize(final String topic, final LeftOrRightValue data) return null; } - final byte[] rawValue = (data.getLeftValue() != null) - ? leftSerializer.serialize(topic, data.getLeftValue()) - : rightSerializer.serialize(topic, data.getRightValue()); + final byte[] rawValue = (data.leftValue() != null) + ? leftSerializer.serialize(topic, data.leftValue()) + : rightSerializer.serialize(topic, data.rightValue()); if (rawValue == null) { return null; @@ -75,7 +75,7 @@ public byte[] serialize(final String topic, final LeftOrRightValue data) return ByteBuffer .allocate(1 + rawValue.length) - .put((byte) (data.getLeftValue() != null ? 1 : 0)) + .put((byte) (data.leftValue() != null ? 1 : 0)) .put(rawValue) .array(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java index 77e788d879f70..6b9cd747de245 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java @@ -89,7 +89,7 @@ public synchronized void destroy() { + "an entire store is closed, via the close() method rather than destroy()."); } - final Bytes keyPrefix = prefixKeyFormatter.getPrefix(); + final Bytes keyPrefix = prefixKeyFormatter.prefix(); // this deleteRange() call deletes all entries with the given prefix, because the // deleteRange() implementation calls Bytes.increment() in order to make keyTo inclusive @@ -192,8 +192,8 @@ private synchronized byte[] get(final Bytes key, final Optional snapsh } } - public Snapshot getSnapshot() { - return physicalStore.getSnapshot(); + public Snapshot snapshot() { + return physicalStore.snapshot(); } public void releaseSnapshot(final Snapshot snapshot) { @@ -204,14 +204,14 @@ public void releaseSnapshot(final Snapshot snapshot) { public synchronized KeyValueIterator range(final Bytes from, final Bytes to) { // from bound is inclusive. if the provided bound is null, replace with prefix final Bytes fromBound = from == null - ? prefixKeyFormatter.getPrefix() + ? prefixKeyFormatter.prefix() : prefixKeyFormatter.addPrefix(from); // to bound is inclusive. if the provided bound is null, replace with the next prefix. // this requires potentially filtering out the element corresponding to the next prefix // with empty bytes from the returned iterator. this filtering is accomplished by // passing the prefix filter into StrippedPrefixKeyValueIteratorAdapter(). final Bytes toBound = to == null - ? incrementWithoutOverflow(prefixKeyFormatter.getPrefix()) + ? incrementWithoutOverflow(prefixKeyFormatter.prefix()) : prefixKeyFormatter.addPrefix(to); final KeyValueIterator iteratorWithKeyPrefixes = physicalStore.range( fromBound, @@ -226,7 +226,7 @@ public synchronized KeyValueIterator range(final Bytes from, fina @Override public synchronized KeyValueIterator all() { final KeyValueIterator iteratorWithKeyPrefixes = physicalStore.prefixScan( - prefixKeyFormatter.getPrefix(), + prefixKeyFormatter.prefix(), new BytesSerializer(), openIterators); return new StrippedPrefixKeyValueIteratorAdapter( @@ -288,7 +288,7 @@ private byte[] removePrefix(final byte[] keyWithPrefix) { return rawKey; } - Bytes getPrefix() { + Bytes prefix() { return Bytes.wrap(prefix); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index 85985f9d373ba..bcbeb4689b388 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -29,7 +29,7 @@ * Regular segments with {@code segmentId >= 0} expire according to the specified * retention period. "Reserved" segments with {@code segmentId < 0} do not expire * and are completely separate from regular segments in that methods such as - * {@link #getSegmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, + * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)}, * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} * only return regular segments and not reserved segments. The methods {@link #flush()} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java index 3acdeac227397..d31d9942af639 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java @@ -95,16 +95,16 @@ private boolean maybeFillIterator() { // fact all use the same physical RocksDB under-the-hood. this.snapshotOwner = segment; // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency) - this.snapshot = snapshotOwner.getSnapshot(); + this.snapshot = snapshotOwner.snapshot(); } final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore - final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.timestamp(rawSegmentValue); if (recordTimestamp <= toTime) { // latest value satisfies timestamp bound - queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); + queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.value(rawSegmentValue), recordTimestamp)); } } else { // this segment contains records with the specified key and time range diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index ba3829cda562d..6816910758e65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -61,7 +61,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; -import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; /** * A Metered {@link KeyValueStore} wrapper that is used for recording operation metrics, and hence its @@ -263,7 +262,7 @@ private QueryResult runRangeQuery(final Query query, final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( iterator, getSensor, - getDeserializeValue(serdes, wrapped()) + StoreQueryUtils.deserializeValue(serdes, wrapped()) ); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult( @@ -289,7 +288,7 @@ private QueryResult runKeyQuery(final Query query, final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { - final Function deserializer = getDeserializeValue(serdes, wrapped()); + final Function deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); final V value = deserializer.apply(rawResult.getResult()); final QueryResult typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 376d74e7b0d49..ad20af61bda68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -455,7 +455,7 @@ private QueryResult runRangeQuery(final Query query, iteratorDurationSensor, streamsMetrics, serdes::keyFrom, - StoreQueryUtils.getDeserializeValue(serdes, wrapped()), + StoreQueryUtils.deserializeValue(serdes, wrapped()), time, numOpenIterators, openIterators diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 61e6533fb8d14..6f90ef56d868e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -44,7 +44,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; -import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; /** * A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its @@ -186,7 +185,7 @@ private QueryResult runTimestampedKeyQuery(final Query query, final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { - final Function> deserializer = getDeserializeValue(serdes, wrapped()); + final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); final ValueAndTimestamp valueAndTimestamp = deserializer.apply(rawResult.getResult()); final QueryResult> typedQueryResult = InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, valueAndTimestamp); @@ -224,7 +223,7 @@ private QueryResult runTimestampedRangeQuery(final Query query, final KeyValueIterator> resultIterator = (KeyValueIterator>) new MeteredTimestampedKeyValueStoreIterator( iterator, getSensor, - getDeserializeValue(serdes, wrapped()), + StoreQueryUtils.deserializeValue(serdes, wrapped()), false ); final QueryResult>> typedQueryResult = @@ -251,7 +250,7 @@ private QueryResult runKeyQuery(final Query query, final QueryResult rawResult = wrapped().query(rawKeyQuery, positionBound, config); if (rawResult.isSuccess()) { - final Function> deserializer = getDeserializeValue(serdes, wrapped()); + final Function> deserializer = StoreQueryUtils.deserializeValue(serdes, wrapped()); final ValueAndTimestamp valueAndTimestamp = deserializer.apply(rawResult.getResult()); final V plainValue = valueAndTimestamp == null ? null : valueAndTimestamp.value(); final QueryResult typedQueryResult = @@ -290,7 +289,7 @@ private QueryResult runRangeQuery(final Query query, final KeyValueIterator resultIterator = new MeteredTimestampedKeyValueStoreIterator( iterator, getSensor, - getDeserializeValue(serdes, wrapped()), + StoreQueryUtils.deserializeValue(serdes, wrapped()), true ); final QueryResult> typedQueryResult = diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index b331bb6ac8f7c..acdc379664653 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -267,7 +267,7 @@ private QueryResult runMultiVersionedKeyQuery(final Query query, final rawResult.getResult(), iteratorDurationSensor, time, - StoreQueryUtils.getDeserializeValue(plainValueSerdes), + StoreQueryUtils.deserializeValue(plainValueSerdes), numOpenIterators, openIterators ); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index b2a84ce002da8..05d423a985c83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -56,7 +56,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; -import static org.apache.kafka.streams.state.internals.StoreQueryUtils.getDeserializeValue; public class MeteredWindowStore extends WrappedStateStore, Windowed, V> @@ -417,7 +416,7 @@ private QueryResult runRangeQuery(final Query query, iteratorDurationSensor, streamsMetrics, serdes::keyFrom, - getDeserializeValue(serdes, wrapped()), + StoreQueryUtils.deserializeValue(serdes, wrapped()), time, numOpenIterators, openIterators @@ -469,7 +468,7 @@ private QueryResult runKeyQuery(final Query query, fetchSensor, iteratorDurationSensor, streamsMetrics, - getDeserializeValue(serdes, wrapped()), + StoreQueryUtils.deserializeValue(serdes, wrapped()), time, numOpenIterators, openIterators diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index b643fe1c7a776..52c193c86e0c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -365,7 +365,7 @@ private void validateStoreOpen() { } } - public Snapshot getSnapshot() { + public Snapshot snapshot() { return db.getSnapshot(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index 21c56a6328fde..eaaed6f30e373 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -191,8 +191,8 @@ public VersionedRecord get(final Bytes key) { final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); if (rawLatestValueAndTimestamp != null) { return new VersionedRecord<>( - LatestValueFormatter.getValue(rawLatestValueAndTimestamp), - LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp) + LatestValueFormatter.value(rawLatestValueAndTimestamp), + LatestValueFormatter.timestamp(rawLatestValueAndTimestamp) ); } else { return null; @@ -210,12 +210,12 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { // still be returned (i.e., the latest record version per key never expires). final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); if (rawLatestValueAndTimestamp != null) { - final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp); if (latestTimestamp <= asOfTimestamp) { // latest value satisfies timestamp bound return new VersionedRecord<>( - LatestValueFormatter.getValue(rawLatestValueAndTimestamp), - latestTimestamp + LatestValueFormatter.value(rawLatestValueAndTimestamp), + latestTimestamp ); } } @@ -230,9 +230,9 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { // first check the latest value store final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); if (rawLatestValueAndTimestamp != null) { - final long latestTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + final long latestTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp); if (latestTimestamp <= asOfTimestamp) { - return new VersionedRecord<>(LatestValueFormatter.getValue(rawLatestValueAndTimestamp), latestTimestamp); + return new VersionedRecord<>(LatestValueFormatter.value(rawLatestValueAndTimestamp), latestTimestamp); } } @@ -241,14 +241,14 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) { for (final LogicalKeyValueSegment segment : segments) { final byte[] rawSegmentValue = segment.get(key); if (rawSegmentValue != null) { - final long nextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); + final long nextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue); if (nextTs <= asOfTimestamp) { // this segment contains no data for the queried timestamp, so earlier segments // cannot either return null; } - if (RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue) > asOfTimestamp) { + if (RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue) > asOfTimestamp) { // the segment only contains data for after the queried timestamp. skip and // continue the search to earlier segments. as an optimization, this code // could be updated to skip forward to the segment containing the minTimestamp @@ -474,7 +474,7 @@ interface VersionedStoreClient { /** * @return the contents of the latest value store, for the given key */ - byte[] getLatestValue(Bytes key); + byte[] latestValue(Bytes key); /** * Puts the provided key and value into the latest value store. @@ -496,7 +496,7 @@ interface VersionedStoreClient { * timestamp bound, in reverse order by segment id (and time), i.e., such that * the most recent segment is first */ - List getReverseSegments(long timestampFrom); + List reversedSegments(long timestampFrom); /** * @return the segment id associated with the provided timestamp @@ -510,7 +510,7 @@ interface VersionedStoreClient { class RocksDBVersionedStoreClient implements VersionedStoreClient { @Override - public byte[] getLatestValue(final Bytes key) { + public byte[] latestValue(final Bytes key) { return latestValueStore.get(key); } @@ -530,7 +530,7 @@ public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long segmentId, fin } @Override - public List getReverseSegments(final long timestampFrom) { + public List reversedSegments(final long timestampFrom) { return segmentStores.segments(timestampFrom, Long.MAX_VALUE, false); } @@ -668,9 +668,9 @@ private PutStatus maybePutToLatestValueStore( // that the segment should be inserted into the latest value store. long foundTs = SENTINEL_TIMESTAMP; - final byte[] rawLatestValueAndTimestamp = versionedStoreClient.getLatestValue(key); + final byte[] rawLatestValueAndTimestamp = versionedStoreClient.latestValue(key); if (rawLatestValueAndTimestamp != null) { - final long latestValueStoreTimestamp = LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp); + final long latestValueStoreTimestamp = LatestValueFormatter.timestamp(rawLatestValueAndTimestamp); if (timestamp >= latestValueStoreTimestamp) { // new record belongs in the latest value store if (timestamp > latestValueStoreTimestamp) { @@ -692,7 +692,7 @@ private PutStatus maybePutToLatestValueStore( // is expired.) so, there is nothing to do for this step if `segment == null`, // but we do still update the latest value store with the new record below. if (segment != null) { - final byte[] rawValueToMove = LatestValueFormatter.getValue(rawLatestValueAndTimestamp); + final byte[] rawValueToMove = LatestValueFormatter.value(rawLatestValueAndTimestamp); final byte[] rawSegmentValue = segment.get(key); if (rawSegmentValue == null) { segment.put( @@ -734,11 +734,11 @@ private PutStatus maybePutToSegments( // initialize with current foundTs value long foundTs = prevFoundTs; - final List segments = versionedStoreClient.getReverseSegments(timestamp); + final List segments = versionedStoreClient.reversedSegments(timestamp); for (final T segment : segments) { final byte[] rawSegmentValue = segment.get(key); if (rawSegmentValue != null) { - final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); + final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue); if (foundNextTs <= timestamp) { // this segment (and all earlier segments) does not contain records affected by // this put. insert into the segment specified by foundTs (i.e., the next @@ -746,7 +746,7 @@ private PutStatus maybePutToSegments( return new PutStatus(false, foundTs); } - final long foundMinTs = RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue); + final long foundMinTs = RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(rawSegmentValue); if (foundMinTs <= timestamp) { // the record being inserted belongs in this segment. // insert and conclude the procedure. @@ -906,7 +906,7 @@ private long finishPut( ); } else { // insert as latest, since foundTs = sentinel means nothing later exists - if (RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) == timestamp) { + if (RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue) == timestamp) { // next timestamp equal to put() timestamp already represents a tombstone, // so no additional insertion is needed in this case return foundTs; @@ -914,7 +914,7 @@ private long finishPut( final SegmentValue segmentValue = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue); segmentValue.insertAsLatest( - RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue), + RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue), timestamp, null ); @@ -948,7 +948,7 @@ private long finishPut( .serialize() ); } else { - final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); + final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(rawSegmentValue); if (foundNextTs <= timestamp) { // insert as latest. this case is possible if the found segment is "degenerate" // (cf RocksDBVersionedStoreSegmentValueFormatter.java for details) as older @@ -980,7 +980,7 @@ static final class LatestValueFormatter { * @return the timestamp, from the latest value store value bytes (representing value * and timestamp) */ - static long getTimestamp(final byte[] rawLatestValueAndTimestamp) { + static long timestamp(final byte[] rawLatestValueAndTimestamp) { return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong(); } @@ -988,7 +988,7 @@ static long getTimestamp(final byte[] rawLatestValueAndTimestamp) { * @return the actual record value, from the latest value store value bytes (representing * value and timestamp) */ - static byte[] getValue(final byte[] rawLatestValueAndTimestamp) { + static byte[] value(final byte[] rawLatestValueAndTimestamp) { final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length - TIMESTAMP_SIZE]; System.arraycopy(rawLatestValueAndTimestamp, TIMESTAMP_SIZE, rawValue, 0, rawValue.length); return rawValue; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java index 1cd37b6ab9051..bd82465ec4907 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java @@ -90,7 +90,7 @@ void flush() throws RocksDBException { // flush segments first, as this is consistent with the store always writing to // older segments/stores before later ones try (final WriteBatch segmentsBatch = new WriteBatch()) { - final List allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE); + final List allSegments = restoreClient.reversedSegments(Long.MIN_VALUE); if (allSegments.size() > 0) { // collect entries into write batch for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { @@ -186,12 +186,12 @@ Map getAll() { private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient { @Override - public byte[] getLatestValue(final Bytes key) { + public byte[] latestValue(final Bytes key) { final Optional bufferValue = latestValueWriteBuffer.get(key); if (bufferValue != null) { return bufferValue.orElse(null); } - return dbClient.getLatestValue(key); + return dbClient.latestValue(key); } @Override @@ -221,13 +221,13 @@ public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segm } @Override - public List getReverseSegments(final long timestampFrom) { + public List reversedSegments(final long timestampFrom) { // head and not tail because the map is sorted in reverse order final long segmentFrom = segmentIdForTimestamp(timestampFrom); final List bufferSegments = new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values()); - final List dbSegments = dbClient.getReverseSegments(timestampFrom); + final List dbSegments = dbClient.reversedSegments(timestampFrom); // merge segments from db with segments from write buffer final List allSegments = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java index 0b22ada12f68c..9b60bd4f06375 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java @@ -101,14 +101,14 @@ final class RocksDBVersionedStoreSegmentValueFormatter { /** * @return the validTo timestamp of the latest record in the provided segment */ - static long getNextTimestamp(final byte[] segmentValue) { + static long nextTimestamp(final byte[] segmentValue) { return ByteBuffer.wrap(segmentValue).getLong(0); } /** * @return the (validFrom) timestamp of the earliest record in the provided segment. */ - static long getMinTimestamp(final byte[] segmentValue) { + static long minTimestamp(final byte[] segmentValue) { return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE); } @@ -271,9 +271,9 @@ private static class PartiallyDeserializedSegmentValue implements SegmentValue { private PartiallyDeserializedSegmentValue(final byte[] segmentValue) { this.segmentValue = segmentValue; this.nextTimestamp = - RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue); + RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue); this.minTimestamp = - RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue); + RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue); this.isDegenerate = nextTimestamp == minTimestamp; resetDeserHelpers(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 7e50b98452165..a3ef2426c3cc7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -26,7 +26,7 @@ interface Segments { String segmentName(final long segmentId); - S getSegmentForTimestamp(final long timestamp); + S segmentForTimestamp(final long timestamp); S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java index fa2081ad25bcd..bc7a52a48141b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java @@ -410,7 +410,7 @@ private static QueryResult runMultiVersionedKeyQuery(final Query query } @SuppressWarnings({"unchecked", "rawtypes"}) - public static Function getDeserializeValue(final StateSerdes serdes, final StateStore wrapped) { + public static Function deserializeValue(final StateSerdes serdes, final StateStore wrapped) { final Serde valueSerde = serdes.valueSerde(); final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isAdapter(wrapped); final Deserializer deserializer; @@ -435,7 +435,7 @@ public static boolean isAdapter(final StateStore stateStore) { } @SuppressWarnings({"unchecked", "rawtypes"}) - public static Function, VersionedRecord> getDeserializeValue(final StateSerdes serdes) { + public static Function, VersionedRecord> deserializeValue(final StateSerdes serdes) { final Serde valueSerde = serdes.valueSerde(); final Deserializer deserializer = valueSerde.deserializer(); return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() ? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), rawVersionedRecord.value()), diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java index 3b799e815399f..127835438dfd2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSide.java @@ -65,11 +65,11 @@ public boolean isLeftSide() { return leftSide; } - public K getKey() { + public K key() { return key; } - public long getTimestamp() { + public long timestamp() { return timestamp; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java index 801c417e1edce..1c4e367505875 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyAndJoinSideSerializer.java @@ -56,8 +56,8 @@ public void configure(final Map configs, final boolean isKey) { @Override public byte[] serialize(final String topic, final TimestampedKeyAndJoinSide data) { final byte boolByte = (byte) (data.isLeftSide() ? 1 : 0); - final byte[] keyBytes = keySerializer.serialize(topic, data.getKey()); - final byte[] timestampBytes = timestampSerializer.serialize(topic, data.getTimestamp()); + final byte[] keyBytes = keySerializer.serialize(topic, data.key()); + final byte[] timestampBytes = timestampSerializer.serialize(topic, data.timestamp()); return ByteBuffer .allocate(timestampBytes.length + 1 + keyBytes.length) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 437d4683eef24..0e25187e788b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -292,7 +292,6 @@ public void shouldGetInstanceWithKeyWithMergedStreams() { final KeyQueryMetadata expected = new KeyQueryMetadata(hostTwo, mkSet(hostOne), 2); - final KeyQueryMetadata actual = metadataState.keyQueryMetadataForKey("merged-table", "the-key", (topic, key, value, numPartitions) -> Optional.of(Collections.singleton(2))); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java index 93d00fb306c1f..9e083ebbaf2b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java @@ -133,7 +133,7 @@ public void shouldCleanupSegmentsThatHaveExpired() { public void shouldGetSegmentForTimestamp() { final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); segments.getOrCreateSegmentIfLive(1, context, -1L); - assertEquals(segment, segments.getSegmentForTimestamp(0L)); + assertEquals(segment, segments.segmentForTimestamp(0L)); } @Test @@ -169,11 +169,11 @@ public void shouldOpenExistingSegments() { segments = new KeyValueSegments("test", METRICS_SCOPE, 4, 1); segments.openExisting(context, -1L); - assertTrue(segments.getSegmentForTimestamp(0).isOpen()); - assertTrue(segments.getSegmentForTimestamp(1).isOpen()); - assertTrue(segments.getSegmentForTimestamp(2).isOpen()); - assertTrue(segments.getSegmentForTimestamp(3).isOpen()); - assertTrue(segments.getSegmentForTimestamp(4).isOpen()); + assertTrue(segments.segmentForTimestamp(0).isOpen()); + assertTrue(segments.segmentForTimestamp(1).isOpen()); + assertTrue(segments.segmentForTimestamp(2).isOpen()); + assertTrue(segments.segmentForTimestamp(3).isOpen()); + assertTrue(segments.segmentForTimestamp(4).isOpen()); } @Test @@ -342,7 +342,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex public void shouldClearSegmentsOnClose() { segments.getOrCreateSegmentIfLive(0, context, -1L); segments.close(); - assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); + assertThat(segments.segmentForTimestamp(0), is(nullValue())); } private void verifyCorrectSegments(final long first, final int numSegments) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java index 74a44d31ab9f0..8e5a27b1218ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentsTest.java @@ -167,10 +167,10 @@ public void shouldGetSegmentForTimestamp() { final LogicalKeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, context, 0L); final LogicalKeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, context, SEGMENT_INTERVAL); - assertEquals(segment1, segments.getSegmentForTimestamp(0L)); - assertEquals(segment1, segments.getSegmentForTimestamp(SEGMENT_INTERVAL - 1)); - assertEquals(segment2, segments.getSegmentForTimestamp(SEGMENT_INTERVAL)); - assertEquals(segment2, segments.getSegmentForTimestamp(2 * SEGMENT_INTERVAL - 1)); + assertEquals(segment1, segments.segmentForTimestamp(0L)); + assertEquals(segment1, segments.segmentForTimestamp(SEGMENT_INTERVAL - 1)); + assertEquals(segment2, segments.segmentForTimestamp(SEGMENT_INTERVAL)); + assertEquals(segment2, segments.segmentForTimestamp(2 * SEGMENT_INTERVAL - 1)); } @Test @@ -226,7 +226,7 @@ public void shouldClearSegmentsOnClose() { segments.close(); - assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); + assertThat(segments.segmentForTimestamp(0), is(nullValue())); assertThat(segments.getReservedSegment(-1), is(nullValue())); // verify iterators closed as well assertThrows(InvalidStateStoreException.class, all1::hasNext); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java index 1507b2f5958c9..7a4f0937ee547 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java @@ -270,8 +270,8 @@ public void shouldFindAll(final TestCase testCase) { public void shouldGetTimestamps(final TestCase testCase) { final byte[] segmentValue = buildSegmentWithInsertLatest(testCase).serialize(); - assertThat(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue), equalTo(testCase.nextTimestamp)); - assertThat(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue), equalTo(testCase.minTimestamp)); + assertThat(RocksDBVersionedStoreSegmentValueFormatter.nextTimestamp(segmentValue), equalTo(testCase.nextTimestamp)); + assertThat(RocksDBVersionedStoreSegmentValueFormatter.minTimestamp(segmentValue), equalTo(testCase.minTimestamp)); } @ParameterizedTest diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java index b4601ba7c6ec5..63a70acb11c05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsTest.java @@ -134,7 +134,7 @@ public void shouldCleanupSegmentsThatHaveExpired() { public void shouldGetSegmentForTimestamp() { final TimestampedSegment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); segments.getOrCreateSegmentIfLive(1, context, -1L); - assertEquals(segment, segments.getSegmentForTimestamp(0L)); + assertEquals(segment, segments.segmentForTimestamp(0L)); } @Test @@ -170,11 +170,11 @@ public void shouldOpenExistingSegments() { segments = new TimestampedSegments("test", METRICS_SCOPE, 4, 1); segments.openExisting(context, -1L); - assertTrue(segments.getSegmentForTimestamp(0).isOpen()); - assertTrue(segments.getSegmentForTimestamp(1).isOpen()); - assertTrue(segments.getSegmentForTimestamp(2).isOpen()); - assertTrue(segments.getSegmentForTimestamp(3).isOpen()); - assertTrue(segments.getSegmentForTimestamp(4).isOpen()); + assertTrue(segments.segmentForTimestamp(0).isOpen()); + assertTrue(segments.segmentForTimestamp(1).isOpen()); + assertTrue(segments.segmentForTimestamp(2).isOpen()); + assertTrue(segments.segmentForTimestamp(3).isOpen()); + assertTrue(segments.segmentForTimestamp(4).isOpen()); } @Test @@ -343,7 +343,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex public void shouldClearSegmentsOnClose() { segments.getOrCreateSegmentIfLive(0, context, -1L); segments.close(); - assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); + assertThat(segments.segmentForTimestamp(0), is(nullValue())); } private void verifyCorrectSegments(final long first, final int numSegments) {