From 02bdee7077a845f6ca72f107d16af5c64f39803e Mon Sep 17 00:00:00 2001 From: Dedo Cibula Date: Mon, 9 Sep 2024 18:35:04 -0700 Subject: [PATCH] added interruptible interface to the read change stream partition record tracker --- .../action/ChildPartitionsRecordAction.java | 9 ++++ .../action/DataChangeRecordAction.java | 13 ++++- .../action/HeartbeatRecordAction.java | 6 +++ .../action/QueryChangeStreamAction.java | 7 +++ .../restriction/Interruptible.java | 36 ++++++++++++++ ...ReadChangeStreamPartitionRangeTracker.java | 49 ++++++++++++++++++- .../ChildPartitionsRecordActionTest.java | 37 ++++++++++++-- .../action/DataChangeRecordActionTest.java | 28 +++++++++-- .../action/HeartbeatRecordActionTest.java | 25 ++++++++-- .../action/QueryChangeStreamActionTest.java | 6 +-- .../ReadChangeStreamPartitionDoFnTest.java | 6 +-- ...ChangeStreamPartitionRangeTrackerTest.java | 44 +++++++++++++++++ 12 files changed, 246 insertions(+), 20 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index ada794d20c3b..fb139aa0f2c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -113,6 +114,14 @@ public Optional run( final Timestamp startTimestamp = record.getStartTimestamp(); final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible + && !((Interruptible) tracker).shouldContinue(startTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with child partitions record at {}, rescheduling", + token, + startTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(startTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 4ceda8afb3e6..60230ae72f54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -43,7 +44,9 @@ public class DataChangeRecordAction { private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class); private final ThroughputEstimator throughputEstimator; - /** @param throughputEstimator an estimator to calculate local throughput of this action. */ + /** + * @param throughputEstimator an estimator to calculate local throughput of this action. + */ public DataChangeRecordAction(ThroughputEstimator throughputEstimator) { this.throughputEstimator = throughputEstimator; } @@ -88,6 +91,14 @@ public Optional run( final Timestamp commitTimestamp = record.getCommitTimestamp(); final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible + && !((Interruptible) tracker).shouldContinue(commitTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with data change record at {}, rescheduling", + token, + commitTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(commitTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 83a232fe2093..37bb572c36c7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -79,6 +80,11 @@ public Optional run( final Timestamp timestamp = record.getTimestamp(); final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); + if (tracker instanceof Interruptible && !((Interruptible) tracker).shouldContinue(timestamp)) { + LOG.debug( + "[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(timestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 92285946e56f..cba439528a6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -62,6 +63,7 @@ public class QueryChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class); private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5); + private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40); private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid"; private final ChangeStreamDao changeStreamDao; @@ -164,6 +166,11 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); + // Set the soft timeout to commit the work if any records have been processed. + if (tracker instanceof Interruptible) { + ((Interruptible) tracker).setSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); + } + try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java new file mode 100644 index 000000000000..b96676798af2 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/Interruptible.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import com.google.cloud.Timestamp; +import org.joda.time.Duration; + +/** An interruptible interface for timestamp restriction tracker. */ +public interface Interruptible { + /** Sets a soft timeout from now for processing new timestamps. */ + public void setSoftTimeout(Duration duration); + + /** + * Returns true if the timestamp tracker can process new timestamps or false if it should + * interrupt processing. + * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + public boolean shouldContinue(Timestamp position); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java index 177a1b7494af..a8555b526c7d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java @@ -22,6 +22,8 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; /** * This restriction tracker delegates most of its behavior to an internal {@link @@ -34,9 +36,12 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker { +public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker + implements Interruptible { private final PartitionMetadata partition; + private Instant softDeadline; + private boolean continueProcessing = true; /** * Receives the partition that will be queried and the timestamp range that belongs to it. @@ -49,6 +54,48 @@ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest this.partition = partition; } + /** + * Sets a soft timeout from now for processing new positions. After the timeout the shouldContinue + * will start returning false indicating an early exit from processing. + */ + @Override + public void setSoftTimeout(Duration duration) { + softDeadline = new Instant(timeSupplier.get().toSqlTimestamp()).plus(duration); + continueProcessing = true; + } + + /** + * Returns true if the restriction tracker can claim new positions. + * + *

If soft timeout isn't set always returns true. Otherwise: + * + *

    + *
  1. If soft deadline hasn't been reached always returns true. + *
  2. If soft deadline has been reached but we haven't processed any positions returns true. + *
  3. If soft deadline has been reached but the new position is the same as the last attempted + * position returns true. + *
  4. If soft deadline has been reached and the new position differs from the last attempted + * position returns false. + *
+ * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + @Override + public boolean shouldContinue(Timestamp position) { + if (!continueProcessing) { + return false; + } + if (softDeadline == null || lastAttemptedPosition == null) { + return true; + } + + continueProcessing &= + new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline) + || position.equals(lastAttemptedPosition); + return continueProcessing; + } + /** * Attempts to claim the given position. * diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 03d390ea0d5d..b1c57733105c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -38,11 +38,10 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -54,7 +53,7 @@ public class ChildPartitionsRecordActionTest { private InTransactionContext transaction; private ChangeStreamMetrics metrics; private ChildPartitionsRecordAction action; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -63,7 +62,7 @@ public void setUp() { transaction = mock(InTransactionContext.class); metrics = mock(ChangeStreamMetrics.class); action = new ChildPartitionsRecordAction(dao, metrics); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); when(dao.runInTransaction(any(), anyObject())) @@ -88,6 +87,7 @@ public void testRestrictionClaimedAndIsSplitCase() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(null); when(transaction.getPartition("childPartition2")).thenReturn(null); @@ -139,6 +139,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class)); when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class)); @@ -169,6 +170,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(null); @@ -209,6 +211,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { when(partition.getEndTimestamp()).thenReturn(endTimestamp); when(partition.getHeartbeatMillis()).thenReturn(heartbeat); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(true); when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class)); @@ -234,6 +237,7 @@ public void testRestrictionNotClaimed() { new ChildPartition("childPartition2", partitionToken)), null); when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(true); when(tracker.tryClaim(startTimestamp)).thenReturn(false); final Optional maybeContinuation = @@ -243,4 +247,29 @@ public void testRestrictionNotClaimed() { verify(watermarkEstimator, never()).setWatermark(any()); verify(dao, never()).insert(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L); + final PartitionMetadata partition = mock(PartitionMetadata.class); + final ChildPartitionsRecord record = + new ChildPartitionsRecord( + startTimestamp, + "recordSequence", + Arrays.asList( + new ChildPartition("childPartition1", partitionToken), + new ChildPartition("childPartition2", partitionToken)), + null); + when(partition.getPartitionToken()).thenReturn(partitionToken); + when(tracker.shouldContinue(startTimestamp)).thenReturn(false); + when(tracker.tryClaim(startTimestamp)).thenReturn(true); + + final Optional maybeContinuation = + action.run(partition, record, tracker, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(dao, never()).insert(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java index ac8d48725299..7522cb60ebac 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java @@ -30,11 +30,10 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -43,7 +42,7 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; private PartitionMetadata partition; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; private BytesThroughputEstimator throughputEstimator; @@ -53,7 +52,7 @@ public void setUp() { throughputEstimator = mock(BytesThroughputEstimator.class); action = new DataChangeRecordAction(throughputEstimator); partition = mock(PartitionMetadata.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -65,6 +64,7 @@ public void testRestrictionClaimed() { final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime()); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -83,6 +83,7 @@ public void testRestrictionNotClaimed() { final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); final DataChangeRecord record = mock(DataChangeRecord.class); when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -94,4 +95,23 @@ public void testRestrictionNotClaimed() { verify(watermarkEstimator, never()).setWatermark(any()); verify(throughputEstimator, never()).update(any(), any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final DataChangeRecord record = mock(DataChangeRecord.class); + when(record.getCommitTimestamp()).thenReturn(timestamp); + when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(outputReceiver, never()).outputWithTimestamp(any(), any()); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(throughputEstimator, never()).update(any(), any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 77333bbbc96e..7f0be39492c3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -29,10 +29,9 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -41,7 +40,7 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; private PartitionMetadata partition; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -49,7 +48,7 @@ public void setUp() { final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class); action = new HeartbeatRecordAction(metrics); partition = mock(PartitionMetadata.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -58,6 +57,7 @@ public void testRestrictionClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(true); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -73,6 +73,7 @@ public void testRestrictionNotClaimed() { final String partitionToken = "partitionToken"; final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + when(tracker.shouldContinue(timestamp)).thenReturn(true); when(tracker.tryClaim(timestamp)).thenReturn(false); when(partition.getPartitionToken()).thenReturn(partitionToken); @@ -82,4 +83,20 @@ public void testRestrictionNotClaimed() { assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + + when(tracker.shouldContinue(timestamp)).thenReturn(false); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index bf7b0adfd475..1734f8fbfda5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -40,12 +40,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -65,7 +65,7 @@ public class QueryChangeStreamActionTest { private PartitionMetadata partition; private ChangeStreamMetrics metrics; private TimestampRange restriction; - private RestrictionTracker restrictionTracker; + private ReadChangeStreamPartitionRangeTracker restrictionTracker; private OutputReceiver outputReceiver; private ChangeStreamRecordMapper changeStreamRecordMapper; private PartitionMetadataMapper partitionMetadataMapper; @@ -110,7 +110,7 @@ public void setUp() throws Exception { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - restrictionTracker = mock(RestrictionTracker.class); + restrictionTracker = mock(ReadChangeStreamPartitionRangeTracker.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = new BundleFinalizerStub(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 538bdf768664..920786b72764 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -41,12 +41,12 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.joda.time.Instant; import org.junit.Before; @@ -66,7 +66,7 @@ public class ReadChangeStreamPartitionDoFnTest { private ReadChangeStreamPartitionDoFn doFn; private PartitionMetadata partition; private TimestampRange restriction; - private RestrictionTracker tracker; + private ReadChangeStreamPartitionRangeTracker tracker; private OutputReceiver receiver; private ManualWatermarkEstimator watermarkEstimator; private BundleFinalizer bundleFinalizer; @@ -107,7 +107,7 @@ public void setUp() { .setScheduledAt(Timestamp.now()) .build(); restriction = mock(TimestampRange.class); - tracker = mock(RestrictionTracker.class); + tracker = mock(ReadChangeStreamPartitionRangeTracker.class); receiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); bundleFinalizer = mock(BundleFinalizer.class); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java index 7c4ae2c9b8b4..341f28215373 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java @@ -27,6 +27,7 @@ import com.google.cloud.Timestamp; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -60,4 +61,47 @@ public void testTrySplitReturnsNullForInitialPartition() { assertNull(tracker.trySplit(0.0D)); } + + @Test + public void testShouldContinueWithoutTimeout() { + assertEquals(range, tracker.currentRestriction()); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(20L))); + assertFalse(tracker.tryClaim(Timestamp.ofTimeMicroseconds(20L))); + } + + @Test + public void testShouldContinueWithTimeout() { + assertEquals(range, tracker.currentRestriction()); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(10L, 0)); + tracker.setSoftTimeout(Duration.standardSeconds(30)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(15L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(10L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(10L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(20L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(30L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(11L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(11L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(39L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(40L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(50L, 0)); + assertTrue(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(16L))); + assertTrue(tracker.tryClaim(Timestamp.ofTimeMicroseconds(16L))); + assertFalse(tracker.shouldContinue(Timestamp.ofTimeMicroseconds(19L))); + } }