From d7153fad4aca6f3ed4d51b3cea0d531471875ef1 Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Mon, 9 Sep 2024 18:35:04 -0700 Subject: [PATCH 1/5] added interruptible interface to the read change stream partition record tracker --- .../action/ChildPartitionsRecordAction.java | 9 ++++ .../action/DataChangeRecordAction.java | 13 ++++- .../action/HeartbeatRecordAction.java | 6 +++ .../action/QueryChangeStreamAction.java | 7 +++ .../restriction/Interruptible.java | 36 ++++++++++++++ ...ReadChangeStreamPartitionRangeTracker.java | 49 ++++++++++++++++++- .../ChildPartitionsRecordActionTest.java | 37 ++++++++++++-- .../action/DataChangeRecordActionTest.java | 28 +++++++++-- .../action/HeartbeatRecordActionTest.java | 25 ++++++++-- .../action/QueryChangeStreamActionTest.java | 6 +-- .../ReadChangeStreamPartitionDoFnTest.java | 6 +-- ...ChangeStreamPartitionRangeTrackerTest.java | 44 +++++++++++++++++ 12 files changed, 246 insertions(+), 20 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index ada794d20c3b..fb139aa0f2c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -113,6 +114,14 @@ public Optional run( final Timestamp startTimestamp = record.getStartTimestamp(); final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible + && !((Interruptible) tracker).shouldContinue(startTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with child partitions record at {}, rescheduling", + token, + startTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(startTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 4ceda8afb3e6..60230ae72f54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -43,7 +44,9 @@ public class DataChangeRecordAction { private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class); private final ThroughputEstimator throughputEstimator; - /** @param throughputEstimator an estimator to calculate local throughput of this action. */ + /** + * @param throughputEstimator an estimator to calculate local throughput of this action. + */ public DataChangeRecordAction(ThroughputEstimator throughputEstimator) { this.throughputEstimator = throughputEstimator; } @@ -88,6 +91,14 @@ public Optional run( final Timestamp commitTimestamp = record.getCommitTimestamp(); final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible + && !((Interruptible) tracker).shouldContinue(commitTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with data change record at {}, rescheduling", + token, + commitTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(commitTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 83a232fe2093..37bb572c36c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -79,6 +80,11 @@ public Optional run( final Timestamp timestamp = record.getTimestamp(); final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible && !((Interruptible) tracker).shouldContinue(timestamp)) { + LOG.debug( + "[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(timestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 92285946e56f..cba439528a6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -62,6 +63,7 @@ public class QueryChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class); private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5); + private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40); private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid"; private final ChangeStreamDao changeStreamDao; @@ -164,6 +166,11 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); + // Set the soft timeout to commit the work if any records have been processed. + if (tracker instanceof Interruptible) { + ((Interruptible) tracker).setSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); + } + try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java new file mode 100644 index 000000000000..b96676798af2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import com.google.cloud.Timestamp; +import org.joda.time.Duration; + +/** An interruptible interface for timestamp restriction tracker. */ +public interface Interruptible { + /** Sets a soft timeout from now for processing new timestamps. */ + public void setSoftTimeout(Duration duration); + + /** + * Returns true if the timestamp tracker can process new timestamps or false if it should + * interrupt processing. + * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + public boolean shouldContinue(Timestamp position); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java index 177a1b7494af..a8555b526c7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; /** * This restriction tracker delegates most of its behavior to an internal {@link @@ -34,9 +36,12 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker { +public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker + implements Interruptible { private final PartitionMetadata partition; + private Instant softDeadline; + private boolean continueProcessing = true; /** * Receives the partition that will be queried and the timestamp range that belongs to it. @@ -49,6 +54,48 @@ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest this.partition = partition; } + /** + * Sets a soft timeout from now for processing new positions. After the timeout the shouldContinue + * will start returning false indicating an early exit from processing. + */ + @Override + public void setSoftTimeout(Duration duration) { + softDeadline = new Instant(timeSupplier.get().toSqlTimestamp()).plus(duration); + continueProcessing = true; + } + + /** + * Returns true if the restriction tracker can claim new positions. + * + *

If soft timeout isn't set always returns true. Otherwise: + * + *

    + *
  1. If soft deadline hasn't been reached always returns true. + *
  2. If soft deadline has been reached but we haven't processed any positions returns true. + *
  3. If soft deadline has been reached but the new position is the same as the last attempted + * position returns true. + *
  4. If soft deadline has been reached and the new position differs from the last attempted + * position returns false. + *
+ * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + @Override + public boolean shouldContinue(Timestamp position) { + if (!continueProcessing) { + return false; + } + if (softDeadline == null || lastAttemptedPosition == null) { + return true; + } + + continueProcessing &= + new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline) + || position.equals(lastAttemptedPosition); + return continueProcessing; + } + /** * Attempts to claim the given position. * diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 03d390ea0d5d..b1c57733105c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -38,11 +38,10 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -54,7 +53,7 @@ public class ChildPartitionsRecordActionTest { private InTransactionContext transaction; private ChangeStreamMetrics metrics; private ChildPartitionsRecordAction action; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -63,7 +62,7 @@ public void setUp() { transaction = mock(InTransactionContext.class); metrics = mock(ChangeStreamMetrics.class); action = new ChildPartitionsRecordAction(dao, metrics); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); when(dao.runInTransaction(any(), anyObject())) @@ -88,6 +87,7 @@ public void testRestrictionClaimedAndIsSplitCase() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(null); when(transaction.getPartition("childPartition2")).thenReturn(null); @@ -139,6 +139,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class)); when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class)); @@ -169,6 +170,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(null); @@ -209,6 +211,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class)); @@ -234,6 +237,7 @@ public void testRestrictionNotClaimed() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(false); final Optional maybeContinuation = @@ -243,4 +247,29 @@ public void testRestrictionNotClaimed() { verify(watermarkEstimator, never()).setWatermark(any()); verify(dao, never()).insert(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L); + final PartitionMetadata partition = mock(PartitionMetadata.class); + final ChildPartitionsRecord record = + new ChildPartitionsRecord( + startTimestamp, + "recordSequence", + Arrays.asList( + new ChildPartition("childPartition1", partitionToken), + new ChildPartition("childPartition2", partitionToken)), + null); + when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(false); + when(tracker.tryClaim(startTimestamp)).thenReturn(true); + + final Optional maybeContinuation = + action.run(partition, record, tracker, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(dao, never()).insert(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java index ac8d48725299..7522cb60ebac 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java @@ -30,11 +30,10 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -43,7 +42,7 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; private PartitionMetadata partition; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; private BytesThroughputEstimator throughputEstimator; @@ -53,7 +52,7 @@ public void setUp() { throughputEstimator = mock(BytesThroughputEstimator.class); action = new DataChangeRecordAction(throughputEstimator); partition = mock(PartitionMetadata.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -65,6 +64,7 @@ public void testRestrictionClaimed() { final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime()); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -83,6 +83,7 @@ public void testRestrictionNotClaimed() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -94,4 +95,23 @@ public void testRestrictionNotClaimed() { verify(watermarkEstimator, never()).setWatermark(any()); verify(throughputEstimator, never()).update(any(), any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final DataChangeRecord record = mock(DataChangeRecord.class); + when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(outputReceiver, never()).outputWithTimestamp(any(), any()); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(throughputEstimator, never()).update(any(), any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 77333bbbc96e..7f0be39492c3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -29,10 +29,9 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -41,7 +40,7 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; private PartitionMetadata partition; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -49,7 +48,7 @@ public void setUp() { final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); action = new HeartbeatRecordAction(metrics); partition = mock(PartitionMetadata.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -58,6 +57,7 @@ public void testRestrictionClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -73,6 +73,7 @@ public void testRestrictionNotClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -82,4 +83,20 @@ public void testRestrictionNotClaimed() { assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + + when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index bf7b0adfd475..1734f8fbfda5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -40,12 +40,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -65,7 +65,7 @@ public class QueryChangeStreamActionTest { private PartitionMetadata partition; private ChangeStreamMetrics metrics; private TimestampRange restriction; - private RestrictionTracker restrictionTracker; + private ReadChangeStreamPartitionRangeTracker restrictionTracker; private OutputReceiver outputReceiver; private ChangeStreamRecordMapper changeStreamRecordMapper; private PartitionMetadataMapper partitionMetadataMapper; @@ -110,7 +110,7 @@ public void setUp() throws Exception { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - restrictionTracker = mock(RestrictionTracker.class); + restrictionTracker = mock(ReadChangeStreamPartitionRangeTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = new BundleFinalizerStub(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 538bdf768664..920786b72764 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -41,12 +41,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -66,7 +66,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ReadChangeStreamPartitionDoFn doFn; private PartitionMetadata partition; private TimestampRange restriction; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private OutputReceiver receiver; private ManualWatermarkEstimator watermarkEstimator; private BundleFinalizer bundleFinalizer; @@ -107,7 +107,7 @@ public void setUp() { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); receiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = mock(BundleFinalizer.class); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java index 7c4ae2c9b8b4..341f28215373 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java @@ -27,6 +27,7 @@ import com.google.cloud.Timestamp; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -60,4 +61,47 @@ public void testTrySplitReturnsNullForInitialPartition() { assertNull(tracker.trySplit(0.0D)); } + + @Test + public void testShouldContinueWithoutTimeout() { + assertEquals(range, tracker.currentRestriction()); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(20L))); + assertFalse(tracker.tryClaim(Timestamp.ofTimeMicroseconds(20L))); + } + + @Test + public void testShouldContinueWithTimeout() { + assertEquals(range, tracker.currentRestriction()); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(10L, 0)); + tracker.setSoftTimeout(Duration.standardSeconds(30)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(15L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(20L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(30L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(39L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(40L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(50L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + assertFalse(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(19L))); + } } From d252d3d3aabcee42a1547309772a0be02d9b8e5e Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Mon, 9 Dec 2024 12:29:33 -0800 Subject: [PATCH 2/5] replaced interruptible interface with an explicit restriction interrupter class --- .../action/ChildPartitionsRecordAction.java | 6 +- .../action/DataChangeRecordAction.java | 6 +- .../action/HeartbeatRecordAction.java | 5 +- .../action/QueryChangeStreamAction.java | 22 +++-- .../restriction/Interruptible.java | 36 -------- ...ReadChangeStreamPartitionRangeTracker.java | 47 +---------- .../restriction/RestrictionInterrupter.java | 84 +++++++++++++++++++ .../ChildPartitionsRecordActionTest.java | 29 ++++--- .../action/DataChangeRecordActionTest.java | 20 +++-- .../action/HeartbeatRecordActionTest.java | 35 ++++++-- .../action/QueryChangeStreamActionTest.java | 60 ++++++------- .../ReadChangeStreamPartitionDoFnTest.java | 12 +-- ...ChangeStreamPartitionRangeTrackerTest.java | 44 ---------- .../RestrictionInterrupterTest.java | 60 +++++++++++++ 14 files changed, 257 insertions(+), 209 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index fb139aa0f2c0..291213e93ada 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -106,6 +106,7 @@ public Optional run( PartitionMetadata partition, ChildPartitionsRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -114,8 +115,7 @@ public Optional run( final Timestamp startTimestamp = record.getStartTimestamp(); final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible - && !((Interruptible) tracker).shouldContinue(startTimestamp)) { + if (interrupter.tryInterrupt(startTimestamp)) { LOG.debug( "[{}] Soft deadline reached with child partitions record at {}, rescheduling", token, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 60230ae72f54..6fc4530f9a68 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -24,7 +24,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -83,6 +83,7 @@ public Optional run( PartitionMetadata partition, DataChangeRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, OutputReceiver outputReceiver, ManualWatermarkEstimator watermarkEstimator) { @@ -91,8 +92,7 @@ public Optional run( final Timestamp commitTimestamp = record.getCommitTimestamp(); final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible - && !((Interruptible) tracker).shouldContinue(commitTimestamp)) { + if (interrupter.tryInterrupt(commitTimestamp)) { LOG.debug( "[{}] Soft deadline reached with data change record at {}, rescheduling", token, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 37bb572c36c7..446907b9024a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -22,7 +22,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -73,6 +73,7 @@ public Optional run( PartitionMetadata partition, HeartbeatRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -80,7 +81,7 @@ public Optional run( final Timestamp timestamp = record.getTimestamp(); final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); - if (tracker instanceof Interruptible && !((Interruptible) tracker).shouldContinue(timestamp)) { + if (interrupter.tryInterrupt(timestamp)) { LOG.debug( "[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp); return Optional.of(ProcessContinuation.resume()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index cba439528a6b..67c71b0b46d0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -166,10 +166,9 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); - // Set the soft timeout to commit the work if any records have been processed. - if (tracker instanceof Interruptible) { - ((Interruptible) tracker).setSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); - } + // Interrupter with soft timeout to commit the work if any records have been processed. + RestrictionInterrupter interrupter = + RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( @@ -189,16 +188,25 @@ public ProcessContinuation run( updatedPartition, (DataChangeRecord) record, tracker, + interrupter, receiver, watermarkEstimator); } else if (record instanceof HeartbeatRecord) { maybeContinuation = heartbeatRecordAction.run( - updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator); + updatedPartition, + (HeartbeatRecord) record, + tracker, + interrupter, + watermarkEstimator); } else if (record instanceof ChildPartitionsRecord) { maybeContinuation = childPartitionsRecordAction.run( - updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator); + updatedPartition, + (ChildPartitionsRecord) record, + tracker, + interrupter, + watermarkEstimator); } else { LOG.error("[{}] Unknown record type {}", token, record.getClass()); throw new IllegalArgumentException("Unknown record type " + record.getClass()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java deleted file mode 100644 index b96676798af2..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; - -import com.google.cloud.Timestamp; -import org.joda.time.Duration; - -/** An interruptible interface for timestamp restriction tracker. */ -public interface Interruptible { - /** Sets a soft timeout from now for processing new timestamps. */ - public void setSoftTimeout(Duration duration); - - /** - * Returns true if the timestamp tracker can process new timestamps or false if it should - * interrupt processing. - * - * @return {@code true} if the position processing should continue, {@code false} if the soft - * deadline has been reached and we have fully processed the previous position. - */ - public boolean shouldContinue(Timestamp position); -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java index a8555b526c7d..735ca96a8b7b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java @@ -36,12 +36,9 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker - implements Interruptible { +public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker { private final PartitionMetadata partition; - private Instant softDeadline; - private boolean continueProcessing = true; /** * Receives the partition that will be queried and the timestamp range that belongs to it. @@ -54,48 +51,6 @@ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest this.partition = partition; } - /** - * Sets a soft timeout from now for processing new positions. After the timeout the shouldContinue - * will start returning false indicating an early exit from processing. - */ - @Override - public void setSoftTimeout(Duration duration) { - softDeadline = new Instant(timeSupplier.get().toSqlTimestamp()).plus(duration); - continueProcessing = true; - } - - /** - * Returns true if the restriction tracker can claim new positions. - * - *

If soft timeout isn't set always returns true. Otherwise: - * - *

    - *
  1. If soft deadline hasn't been reached always returns true. - *
  2. If soft deadline has been reached but we haven't processed any positions returns true. - *
  3. If soft deadline has been reached but the new position is the same as the last attempted - * position returns true. - *
  4. If soft deadline has been reached and the new position differs from the last attempted - * position returns false. - *
- * - * @return {@code true} if the position processing should continue, {@code false} if the soft - * deadline has been reached and we have fully processed the previous position. - */ - @Override - public boolean shouldContinue(Timestamp position) { - if (!continueProcessing) { - return false; - } - if (softDeadline == null || lastAttemptedPosition == null) { - return true; - } - - continueProcessing &= - new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline) - || position.equals(lastAttemptedPosition); - return continueProcessing; - } - /** * Attempts to claim the given position. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java new file mode 100644 index 000000000000..81696550747a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import java.util.function.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** An interrupter for restriction tracker of type T. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class RestrictionInterrupter { + private T lastAttemptedPosition; + + private Supplier timeSupplier; + private Instant softDeadline; + private boolean hasInterrupted = true; + + /** + * Sets a soft timeout from now for processing new positions. After the timeout the tryInterrupt + * will start returning true indicating an early exit from processing. + */ + public static RestrictionInterrupter withSoftTimeout(Duration timeout) { + return new RestrictionInterrupter(() -> Instant.now(), timeout); + } + + RestrictionInterrupter(Supplier timeSupplier, Duration timeout) { + this.timeSupplier = timeSupplier; + this.softDeadline = this.timeSupplier.get().plus(timeout); + hasInterrupted = false; + } + + @VisibleForTesting + void setTimeSupplier(Supplier timeSupplier) { + this.timeSupplier = timeSupplier; + } + + /** + * Returns true if the restriction tracker should be interrupted in claiming new positions. + * + *
    + *
  1. If soft deadline hasn't been reached always returns false. + *
  2. If soft deadline has been reached but we haven't processed any positions returns false. + *
  3. If soft deadline has been reached but the new position is the same as the last attempted + * position returns false. + *
  4. If soft deadline has been reached and the new position differs from the last attempted + * position returns true. + *
+ * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + public boolean tryInterrupt(T position) { + if (hasInterrupted) { + return true; + } + if (lastAttemptedPosition == null) { + lastAttemptedPosition = position; + return false; + } + + hasInterrupted |= + timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition); + lastAttemptedPosition = position; + return hasInterrupted; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index b1c57733105c..5815bf0c6fdd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -38,10 +38,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -53,7 +55,8 @@ public class ChildPartitionsRecordActionTest { private InTransactionContext transaction; private ChangeStreamMetrics metrics; private ChildPartitionsRecordAction action; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -62,7 +65,8 @@ public void setUp() { transaction = mock(InTransactionContext.class); metrics = mock(ChangeStreamMetrics.class); action = new ChildPartitionsRecordAction(dao, metrics); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); when(dao.runInTransaction(any(), anyObject())) @@ -87,13 +91,12 @@ public void testRestrictionClaimedAndIsSplitCase() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(null); when(transaction.getPartition("childPartition2")).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -139,13 +142,12 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class)); when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -170,12 +172,11 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -211,12 +212,11 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -237,11 +237,10 @@ public void testRestrictionNotClaimed() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(false); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); @@ -262,11 +261,11 @@ public void testSoftDeadlineReached() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); - when(tracker.shouldContinue(startTimestamp)).thenReturn(false); + when(interrupter.tryInterrupt(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java index 7522cb60ebac..6569f810812c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java @@ -30,10 +30,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -42,7 +44,8 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; private PartitionMetadata partition; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; private BytesThroughputEstimator throughputEstimator; @@ -52,7 +55,8 @@ public void setUp() { throughputEstimator = mock(BytesThroughputEstimator.class); action = new DataChangeRecordAction(throughputEstimator); partition = mock(PartitionMetadata.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -64,12 +68,11 @@ public void testRestrictionClaimed() { final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime()); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(outputReceiver).outputWithTimestamp(record, instant); @@ -83,12 +86,11 @@ public void testRestrictionNotClaimed() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(outputReceiver, never()).outputWithTimestamp(any(), any()); @@ -102,12 +104,12 @@ public void testSoftDeadlineReached() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); - when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(outputReceiver, never()).outputWithTimestamp(any(), any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 7f0be39492c3..56d1825c8a18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -29,9 +29,11 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -40,7 +42,8 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; private PartitionMetadata partition; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -48,7 +51,8 @@ public void setUp() { final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); action = new HeartbeatRecordAction(metrics); partition = mock(PartitionMetadata.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -57,12 +61,16 @@ public void testRestrictionClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); @@ -73,12 +81,16 @@ public void testRestrictionNotClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); @@ -89,12 +101,17 @@ public void testSoftDeadlineReached() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); - when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 1734f8fbfda5..533cb46393a1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -40,12 +41,13 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -65,7 +67,7 @@ public class QueryChangeStreamActionTest { private PartitionMetadata partition; private ChangeStreamMetrics metrics; private TimestampRange restriction; - private ReadChangeStreamPartitionRangeTracker restrictionTracker; + private RestrictionTracker restrictionTracker; private OutputReceiver outputReceiver; private ChangeStreamRecordMapper changeStreamRecordMapper; private PartitionMetadataMapper partitionMetadataMapper; @@ -110,7 +112,7 @@ public void setUp() throws Exception { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - restrictionTracker = mock(ReadChangeStreamPartitionRangeTracker.class); + restrictionTracker = mock(RestrictionTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = new BundleFinalizerStub(); @@ -144,10 +146,10 @@ public void testQueryChangeStreamWithDataChangeRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(dataChangeRecordAction.run( - partition, record1, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(dataChangeRecordAction.run( - partition, record2, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -157,13 +159,13 @@ public void testQueryChangeStreamWithDataChangeRecord() { assertEquals(ProcessContinuation.stop(), result); verify(dataChangeRecordAction) - .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); verify(dataChangeRecordAction) - .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -188,9 +190,9 @@ public void testQueryChangeStreamWithHeartbeatRecord() { when(resultSet.getMetadata()).thenReturn(resultSetMetadata); when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); - when(heartbeatRecordAction.run(partition, record1, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.empty()); - when(heartbeatRecordAction.run(partition, record2, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -199,12 +201,12 @@ public void testQueryChangeStreamWithHeartbeatRecord() { partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); assertEquals(ProcessContinuation.stop(), result); - verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, watermarkEstimator); - verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator); + verify(heartbeatRecordAction).run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + verify(heartbeatRecordAction).run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -230,10 +232,10 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record1, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -243,13 +245,13 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -279,7 +281,7 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -289,13 +291,13 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -320,9 +322,9 @@ public void testQueryChangeStreamWithStreamFinished() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); } private static class BundleFinalizerStub implements BundleFinalizer { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 920786b72764..87588eb8d0a9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -41,12 +41,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -66,7 +66,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ReadChangeStreamPartitionDoFn doFn; private PartitionMetadata partition; private TimestampRange restriction; - private ReadChangeStreamPartitionRangeTracker tracker; + private RestrictionTracker tracker; private OutputReceiver receiver; private ManualWatermarkEstimator watermarkEstimator; private BundleFinalizer bundleFinalizer; @@ -107,7 +107,7 @@ public void setUp() { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - tracker = mock(ReadChangeStreamPartitionRangeTracker.class); + tracker = mock(RestrictionTracker.class); receiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = mock(BundleFinalizer.class); @@ -149,9 +149,9 @@ public void testQueryChangeStreamMode() { verify(queryChangeStreamAction) .run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(tracker, never()).tryClaim(any()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java index 341f28215373..7c4ae2c9b8b4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java @@ -27,7 +27,6 @@ import com.google.cloud.Timestamp; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -61,47 +60,4 @@ public void testTrySplitReturnsNullForInitialPartition() { assertNull(tracker.trySplit(0.0D)); } - - @Test - public void testShouldContinueWithoutTimeout() { - assertEquals(range, tracker.currentRestriction()); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(20L))); - assertFalse(tracker.tryClaim(Timestamp.ofTimeMicroseconds(20L))); - } - - @Test - public void testShouldContinueWithTimeout() { - assertEquals(range, tracker.currentRestriction()); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(10L, 0)); - tracker.setSoftTimeout(Duration.standardSeconds(30)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(15L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(20L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(30L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(39L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(40L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(50L, 0)); - assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); - assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); - assertFalse(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(19L))); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java new file mode 100644 index 000000000000..dc76465dbf39 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +public class RestrictionInterrupterTest { + + @Test + public void testTryInterrupt() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(10)); + assertFalse(interrupter.tryInterrupt(1)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(15)); + assertFalse(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30)); + assertFalse(interrupter.tryInterrupt(3)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertFalse(interrupter.tryInterrupt(3)); + assertTrue(interrupter.tryInterrupt(4)); + assertTrue(interrupter.tryInterrupt(5)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(5)); + } + + @Test + public void testTryInterruptNoPreviousPosition() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertFalse(interrupter.tryInterrupt(1)); + assertFalse(interrupter.tryInterrupt(1)); + assertTrue(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(3)); + } +} From 636f02c69e0ed4a4c283cf5a266a86643876f86d Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Mon, 9 Dec 2024 12:44:56 -0800 Subject: [PATCH 3/5] applied spotless --- .../action/DataChangeRecordAction.java | 4 +- ...ReadChangeStreamPartitionRangeTracker.java | 2 - .../action/QueryChangeStreamActionTest.java | 106 +++++++++++++++--- 3 files changed, 92 insertions(+), 20 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 6fc4530f9a68..9913d854c5fa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -44,9 +44,7 @@ public class DataChangeRecordAction { private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class); private final ThroughputEstimator throughputEstimator; - /** - * @param throughputEstimator an estimator to calculate local throughput of this action. - */ + /** @param throughputEstimator an estimator to calculate local throughput of this action. */ public DataChangeRecordAction(ThroughputEstimator throughputEstimator) { this.throughputEstimator = throughputEstimator; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java index 735ca96a8b7b..177a1b7494af 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java @@ -22,8 +22,6 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.checkerframework.checker.nullness.qual.Nullable; -import org.joda.time.Duration; -import org.joda.time.Instant; /** * This restriction tracker delegates most of its behavior to an internal {@link diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index 533cb46393a1..c73a62a812bd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -146,10 +146,20 @@ public void testQueryChangeStreamWithDataChangeRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(dataChangeRecordAction.run( - eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(dataChangeRecordAction.run( - eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator))) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -159,9 +169,21 @@ public void testQueryChangeStreamWithDataChangeRecord() { assertEquals(ProcessContinuation.stop(), result); verify(dataChangeRecordAction) - .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator)); verify(dataChangeRecordAction) - .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(outputReceiver), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); @@ -190,9 +212,19 @@ public void testQueryChangeStreamWithHeartbeatRecord() { when(resultSet.getMetadata()).thenReturn(resultSetMetadata); when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); - when(heartbeatRecordAction.run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) + when(heartbeatRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); - when(heartbeatRecordAction.run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) + when(heartbeatRecordAction.run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -201,8 +233,20 @@ public void testQueryChangeStreamWithHeartbeatRecord() { partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); assertEquals(ProcessContinuation.stop(), result); - verify(heartbeatRecordAction).run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); - verify(heartbeatRecordAction).run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + verify(heartbeatRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); + verify(heartbeatRecordAction) + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -232,10 +276,18 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(childPartitionsRecordAction.run( - eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -245,9 +297,19 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); @@ -281,7 +343,11 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator))) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -291,9 +357,19 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(eq(partition), eq(record1), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(eq(partition), eq(record2), eq(restrictionTracker), any(RestrictionInterrupter.class), eq(watermarkEstimator)); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); From 8fd02f127b323b9488d610963435cfccd689bb8a Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Tue, 10 Dec 2024 13:47:02 -0800 Subject: [PATCH 4/5] added requested comments and changes --- .../action/ChildPartitionsRecordAction.java | 5 ++++- .../action/DataChangeRecordAction.java | 6 ++++-- .../action/HeartbeatRecordAction.java | 4 +++- .../action/QueryChangeStreamAction.java | 6 ++++++ .../restriction/RestrictionInterrupter.java | 19 ++++++++++--------- .../RestrictionInterrupterTest.java | 5 +++++ 6 files changed, 32 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index 291213e93ada..715c18098c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -95,11 +95,14 @@ public class ChildPartitionsRecordAction { * @param record the change stream child partition record received * @param tracker the restriction tracker of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param watermarkEstimator the watermark estimator of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with + * {@link ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 9913d854c5fa..555b1fefbebc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -21,7 +21,6 @@ import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; @@ -69,12 +68,15 @@ public DataChangeRecordAction(ThroughputEstimator throughputEs * @param partition the current partition being processed * @param record the change stream data record received * @param tracker the restriction tracker of the {@link ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param outputReceiver the output receiver of the {@link ReadChangeStreamPartitionDoFn} SDF * @param watermarkEstimator the watermark estimator of the {@link ReadChangeStreamPartitionDoFn} * SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link DataChangeRecord} timestamp. A non empty {@link Optional} with {@link + * ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 446907b9024a..0937e896fbf1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -57,7 +57,9 @@ public class HeartbeatRecordAction { * not. If the {@link Optional} returned is empty, it means that the calling function can continue * with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned, * it means that this function was unable to claim the timestamp of the {@link HeartbeatRecord}, - * so the caller should stop. + * so the caller should stop. If an {@link Optional} of {@link ProcessContinuation#resume()} is + * returned, it means that this function should not attempt to claim further timestamps of the + * {@link HeartbeatRecord}, but instead should commit what it has processed so far. * *

When processing the {@link HeartbeatRecord} the following procedure is applied: * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 67c71b0b46d0..6edbd544a37c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -63,6 +63,12 @@ public class QueryChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class); private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5); + /* + * Corresponds to the best effort timeout in case the restriction tracker cannot split the processing + * interval before the hard deadline. When reached it will assure that the already processed timestamps + * will be committed instead of thrown away (DEADLINE_EXCEEDED). The value should be less than + * the RetrySetting RPC timeout setting of SpannerIO#ReadChangeStream. + */ private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40); private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid"; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java index 81696550747a..37e91911867a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java @@ -19,18 +19,17 @@ import java.util.function.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; /** An interrupter for restriction tracker of type T. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class RestrictionInterrupter { - private T lastAttemptedPosition; + private @Nullable T lastAttemptedPosition; private Supplier timeSupplier; - private Instant softDeadline; + private final Instant softDeadline; private boolean hasInterrupted = true; /** @@ -67,7 +66,7 @@ void setTimeSupplier(Supplier timeSupplier) { * @return {@code true} if the position processing should continue, {@code false} if the soft * deadline has been reached and we have fully processed the previous position. */ - public boolean tryInterrupt(T position) { + public boolean tryInterrupt(@NonNull T position) { if (hasInterrupted) { return true; } @@ -75,10 +74,12 @@ public boolean tryInterrupt(T position) { lastAttemptedPosition = position; return false; } - - hasInterrupted |= - timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition); + if (position.equals(lastAttemptedPosition)) { + return false; + } lastAttemptedPosition = position; + + hasInterrupted |= timeSupplier.get().isAfter(softDeadline); return hasInterrupted; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java index dc76465dbf39..6d376ec528ba 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java @@ -38,11 +38,15 @@ public void testTryInterrupt() { interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30)); assertFalse(interrupter.tryInterrupt(3)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + // Though the deadline has passed same position as previously accepted is not interrupted. assertFalse(interrupter.tryInterrupt(3)); assertTrue(interrupter.tryInterrupt(4)); assertTrue(interrupter.tryInterrupt(5)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); assertTrue(interrupter.tryInterrupt(5)); + // Even with non-monotonic clock the interrupter will now always interrupt. + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertTrue(interrupter.tryInterrupt(5)); } @Test @@ -52,6 +56,7 @@ public void testTryInterruptNoPreviousPosition() { () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); assertFalse(interrupter.tryInterrupt(1)); + // Though the deadline has passed same position as previously accepted is not interrupted. assertFalse(interrupter.tryInterrupt(1)); assertTrue(interrupter.tryInterrupt(2)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); From f6798653878fb8764d5cd3c501f1b5299d95735c Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Tue, 10 Dec 2024 13:49:41 -0800 Subject: [PATCH 5/5] added splotless check --- .../changestreams/action/ChildPartitionsRecordAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index 715c18098c00..14b6b2e2453a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -101,7 +101,7 @@ public class ChildPartitionsRecordAction { * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable * to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with - * {@link ProcessContinuation#resume()} if this function should commit what has already been + * {@link ProcessContinuation#resume()} if this function should commit what has already been * processed and resume. */ @VisibleForTesting