diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index ada794d20c3b..14b6b2e2453a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -94,17 +95,21 @@ public class ChildPartitionsRecordAction { * @param record the change stream child partition record received * @param tracker the restriction tracker of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param watermarkEstimator the watermark estimator of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with + * {@link ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( PartitionMetadata partition, ChildPartitionsRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -113,6 +118,13 @@ public Optional run( final Timestamp startTimestamp = record.getStartTimestamp(); final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); + if (interrupter.tryInterrupt(startTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with child partitions record at {}, rescheduling", + token, + startTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(startTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 4ceda8afb3e6..555b1fefbebc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -21,9 +21,9 @@ import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -68,18 +68,22 @@ public DataChangeRecordAction(ThroughputEstimator throughputEs * @param partition the current partition being processed * @param record the change stream data record received * @param tracker the restriction tracker of the {@link ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param outputReceiver the output receiver of the {@link ReadChangeStreamPartitionDoFn} SDF * @param watermarkEstimator the watermark estimator of the {@link ReadChangeStreamPartitionDoFn} * SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link DataChangeRecord} timestamp. A non empty {@link Optional} with {@link + * ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( PartitionMetadata partition, DataChangeRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, OutputReceiver outputReceiver, ManualWatermarkEstimator watermarkEstimator) { @@ -88,6 +92,13 @@ public Optional run( final Timestamp commitTimestamp = record.getCommitTimestamp(); final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); + if (interrupter.tryInterrupt(commitTimestamp)) { + LOG.debug( + "[{}] Soft deadline reached with data change record at {}, rescheduling", + token, + commitTimestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(commitTimestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 83a232fe2093..0937e896fbf1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -56,7 +57,9 @@ public class HeartbeatRecordAction { * not. If the {@link Optional} returned is empty, it means that the calling function can continue * with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned, * it means that this function was unable to claim the timestamp of the {@link HeartbeatRecord}, - * so the caller should stop. + * so the caller should stop. If an {@link Optional} of {@link ProcessContinuation#resume()} is + * returned, it means that this function should not attempt to claim further timestamps of the + * {@link HeartbeatRecord}, but instead should commit what it has processed so far. * *

When processing the {@link HeartbeatRecord} the following procedure is applied: * @@ -72,6 +75,7 @@ public Optional run( PartitionMetadata partition, HeartbeatRecord record, RestrictionTracker tracker, + RestrictionInterrupter interrupter, ManualWatermarkEstimator watermarkEstimator) { final String token = partition.getPartitionToken(); @@ -79,6 +83,11 @@ public Optional run( final Timestamp timestamp = record.getTimestamp(); final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime()); + if (interrupter.tryInterrupt(timestamp)) { + LOG.debug( + "[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp); + return Optional.of(ProcessContinuation.resume()); + } if (!tracker.tryClaim(timestamp)) { LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp); return Optional.of(ProcessContinuation.stop()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 92285946e56f..6edbd544a37c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -62,6 +63,13 @@ public class QueryChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class); private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5); + /* + * Corresponds to the best effort timeout in case the restriction tracker cannot split the processing + * interval before the hard deadline. When reached it will assure that the already processed timestamps + * will be committed instead of thrown away (DEADLINE_EXCEEDED). The value should be less than + * the RetrySetting RPC timeout setting of SpannerIO#ReadChangeStream. + */ + private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40); private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid"; private final ChangeStreamDao changeStreamDao; @@ -164,6 +172,10 @@ public ProcessContinuation run( new IllegalStateException( "Partition " + token + " not found in metadata table")); + // Interrupter with soft timeout to commit the work if any records have been processed. + RestrictionInterrupter interrupter = + RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); + try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) { @@ -182,16 +194,25 @@ public ProcessContinuation run( updatedPartition, (DataChangeRecord) record, tracker, + interrupter, receiver, watermarkEstimator); } else if (record instanceof HeartbeatRecord) { maybeContinuation = heartbeatRecordAction.run( - updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator); + updatedPartition, + (HeartbeatRecord) record, + tracker, + interrupter, + watermarkEstimator); } else if (record instanceof ChildPartitionsRecord) { maybeContinuation = childPartitionsRecordAction.run( - updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator); + updatedPartition, + (ChildPartitionsRecord) record, + tracker, + interrupter, + watermarkEstimator); } else { LOG.error("[{}] Unknown record type {}", token, record.getClass()); throw new IllegalArgumentException("Unknown record type " + record.getClass()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java new file mode 100644 index 000000000000..37e91911867a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import java.util.function.Supplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** An interrupter for restriction tracker of type T. */ +public class RestrictionInterrupter { + private @Nullable T lastAttemptedPosition; + + private Supplier timeSupplier; + private final Instant softDeadline; + private boolean hasInterrupted = true; + + /** + * Sets a soft timeout from now for processing new positions. After the timeout the tryInterrupt + * will start returning true indicating an early exit from processing. + */ + public static RestrictionInterrupter withSoftTimeout(Duration timeout) { + return new RestrictionInterrupter(() -> Instant.now(), timeout); + } + + RestrictionInterrupter(Supplier timeSupplier, Duration timeout) { + this.timeSupplier = timeSupplier; + this.softDeadline = this.timeSupplier.get().plus(timeout); + hasInterrupted = false; + } + + @VisibleForTesting + void setTimeSupplier(Supplier timeSupplier) { + this.timeSupplier = timeSupplier; + } + + /** + * Returns true if the restriction tracker should be interrupted in claiming new positions. + * + *

    + *
  1. If soft deadline hasn't been reached always returns false. + *
  2. If soft deadline has been reached but we haven't processed any positions returns false. + *
  3. If soft deadline has been reached but the new position is the same as the last attempted + * position returns false. + *
  4. If soft deadline has been reached and the new position differs from the last attempted + * position returns true. + *
+ * + * @return {@code true} if the position processing should continue, {@code false} if the soft + * deadline has been reached and we have fully processed the previous position. + */ + public boolean tryInterrupt(@NonNull T position) { + if (hasInterrupted) { + return true; + } + if (lastAttemptedPosition == null) { + lastAttemptedPosition = position; + return false; + } + if (position.equals(lastAttemptedPosition)) { + return false; + } + lastAttemptedPosition = position; + + hasInterrupted |= timeSupplier.get().isAfter(softDeadline); + return hasInterrupted; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java index 03d390ea0d5d..5815bf0c6fdd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -55,6 +56,7 @@ public class ChildPartitionsRecordActionTest { private ChangeStreamMetrics metrics; private ChildPartitionsRecordAction action; private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -64,6 +66,7 @@ public void setUp() { metrics = mock(ChangeStreamMetrics.class); action = new ChildPartitionsRecordAction(dao, metrics); tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); when(dao.runInTransaction(any(), anyObject())) @@ -93,7 +96,7 @@ public void testRestrictionClaimedAndIsSplitCase() { when(transaction.getPartition("childPartition2")).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -144,7 +147,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() { when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -173,7 +176,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() { when(transaction.getPartition(childPartitionToken)).thenReturn(null); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -213,7 +216,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() { when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class)); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime())); @@ -237,10 +240,35 @@ public void testRestrictionNotClaimed() { when(tracker.tryClaim(startTimestamp)).thenReturn(false); final Optional maybeContinuation = - action.run(partition, record, tracker, watermarkEstimator); + action.run(partition, record, tracker, interrupter, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); verify(dao, never()).insert(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L); + final PartitionMetadata partition = mock(PartitionMetadata.class); + final ChildPartitionsRecord record = + new ChildPartitionsRecord( + startTimestamp, + "recordSequence", + Arrays.asList( + new ChildPartition("childPartition1", partitionToken), + new ChildPartition("childPartition2", partitionToken)), + null); + when(partition.getPartitionToken()).thenReturn(partitionToken); + when(interrupter.tryInterrupt(startTimestamp)).thenReturn(true); + when(tracker.tryClaim(startTimestamp)).thenReturn(true); + + final Optional maybeContinuation = + action.run(partition, record, tracker, interrupter, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(dao, never()).insert(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java index ac8d48725299..6569f810812c 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; @@ -44,6 +45,7 @@ public class DataChangeRecordActionTest { private DataChangeRecordAction action; private PartitionMetadata partition; private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private OutputReceiver outputReceiver; private ManualWatermarkEstimator watermarkEstimator; private BytesThroughputEstimator throughputEstimator; @@ -54,6 +56,7 @@ public void setUp() { action = new DataChangeRecordAction(throughputEstimator); partition = mock(PartitionMetadata.class); tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); outputReceiver = mock(OutputReceiver.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -69,7 +72,7 @@ public void testRestrictionClaimed() { when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(outputReceiver).outputWithTimestamp(record, instant); @@ -87,11 +90,30 @@ public void testRestrictionNotClaimed() { when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, record, tracker, outputReceiver, watermarkEstimator); + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(outputReceiver, never()).outputWithTimestamp(any(), any()); verify(watermarkEstimator, never()).setWatermark(any()); verify(throughputEstimator, never()).update(any(), any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + final DataChangeRecord record = mock(DataChangeRecord.class); + when(record.getCommitTimestamp()).thenReturn(timestamp); + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(outputReceiver, never()).outputWithTimestamp(any(), any()); + verify(watermarkEstimator, never()).setWatermark(any()); + verify(throughputEstimator, never()).update(any(), any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java index 77333bbbc96e..56d1825c8a18 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; @@ -42,6 +43,7 @@ public class HeartbeatRecordActionTest { private HeartbeatRecordAction action; private PartitionMetadata partition; private RestrictionTracker tracker; + private RestrictionInterrupter interrupter; private ManualWatermarkEstimator watermarkEstimator; @Before @@ -50,6 +52,7 @@ public void setUp() { action = new HeartbeatRecordAction(metrics); partition = mock(PartitionMetadata.class); tracker = mock(RestrictionTracker.class); + interrupter = mock(RestrictionInterrupter.class); watermarkEstimator = mock(ManualWatermarkEstimator.class); } @@ -62,7 +65,12 @@ public void testRestrictionClaimed() { when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.empty(), maybeContinuation); verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime())); @@ -77,9 +85,35 @@ public void testRestrictionNotClaimed() { when(partition.getPartitionToken()).thenReturn(partitionToken); final Optional maybeContinuation = - action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator); + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation); verify(watermarkEstimator, never()).setWatermark(any()); } + + @Test + public void testSoftDeadlineReached() { + final String partitionToken = "partitionToken"; + final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L); + + when(interrupter.tryInterrupt(timestamp)).thenReturn(true); + when(tracker.tryClaim(timestamp)).thenReturn(true); + when(partition.getPartitionToken()).thenReturn(partitionToken); + + final Optional maybeContinuation = + action.run( + partition, + new HeartbeatRecord(timestamp, null), + tracker, + interrupter, + watermarkEstimator); + + assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation); + verify(watermarkEstimator, never()).setWatermark(any()); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index bf7b0adfd475..c73a62a812bd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -144,10 +146,20 @@ public void testQueryChangeStreamWithDataChangeRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(dataChangeRecordAction.run( - partition, record1, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(dataChangeRecordAction.run( - partition, record2, restrictionTracker, outputReceiver, watermarkEstimator)) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -157,13 +169,25 @@ public void testQueryChangeStreamWithDataChangeRecord() { assertEquals(ProcessContinuation.stop(), result); verify(dataChangeRecordAction) - .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator)); verify(dataChangeRecordAction) - .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(outputReceiver), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -188,9 +212,19 @@ public void testQueryChangeStreamWithHeartbeatRecord() { when(resultSet.getMetadata()).thenReturn(resultSetMetadata); when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); - when(heartbeatRecordAction.run(partition, record1, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); - when(heartbeatRecordAction.run(partition, record2, restrictionTracker, watermarkEstimator)) + when(heartbeatRecordAction.run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -199,12 +233,24 @@ public void testQueryChangeStreamWithHeartbeatRecord() { partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); assertEquals(ProcessContinuation.stop(), result); - verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, watermarkEstimator); - verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator); + verify(heartbeatRecordAction) + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); + verify(heartbeatRecordAction) + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -230,10 +276,18 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record1, restrictionTracker, watermarkEstimator)) + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.empty()); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -243,13 +297,23 @@ public void testQueryChangeStreamWithChildPartitionsRecord() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -279,7 +343,11 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { when(changeStreamRecordMapper.toChangeStreamRecords(partition, resultSet, resultSetMetadata)) .thenReturn(Arrays.asList(record1, record2)); when(childPartitionsRecordAction.run( - partition, record2, restrictionTracker, watermarkEstimator)) + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator))) .thenReturn(Optional.of(ProcessContinuation.stop())); when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); @@ -289,13 +357,23 @@ public void testQueryChangeStreamWithRestrictionFromAfterPartitionStart() { assertEquals(ProcessContinuation.stop(), result); verify(childPartitionsRecordAction) - .run(partition, record1, restrictionTracker, watermarkEstimator); + .run( + eq(partition), + eq(record1), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(childPartitionsRecordAction) - .run(partition, record2, restrictionTracker, watermarkEstimator); + .run( + eq(partition), + eq(record2), + eq(restrictionTracker), + any(RestrictionInterrupter.class), + eq(watermarkEstimator)); verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); verify(restrictionTracker, never()).tryClaim(any()); } @@ -320,9 +398,9 @@ public void testQueryChangeStreamWithStreamFinished() { verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP); verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); } private static class BundleFinalizerStub implements BundleFinalizer { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 538bdf768664..87588eb8d0a9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -149,9 +149,9 @@ public void testQueryChangeStreamMode() { verify(queryChangeStreamAction) .run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer); - verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any()); - verify(heartbeatRecordAction, never()).run(any(), any(), any(), any()); - verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any()); + verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any()); + verify(heartbeatRecordAction, never()).run(any(), any(), any(), any(), any()); + verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any(), any()); verify(tracker, never()).tryClaim(any()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java new file mode 100644 index 000000000000..6d376ec528ba --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +public class RestrictionInterrupterTest { + + @Test + public void testTryInterrupt() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(10)); + assertFalse(interrupter.tryInterrupt(1)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(15)); + assertFalse(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30)); + assertFalse(interrupter.tryInterrupt(3)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + // Though the deadline has passed same position as previously accepted is not interrupted. + assertFalse(interrupter.tryInterrupt(3)); + assertTrue(interrupter.tryInterrupt(4)); + assertTrue(interrupter.tryInterrupt(5)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(5)); + // Even with non-monotonic clock the interrupter will now always interrupt. + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertTrue(interrupter.tryInterrupt(5)); + } + + @Test + public void testTryInterruptNoPreviousPosition() { + RestrictionInterrupter interrupter = + new RestrictionInterrupter( + () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertFalse(interrupter.tryInterrupt(1)); + // Though the deadline has passed same position as previously accepted is not interrupted. + assertFalse(interrupter.tryInterrupt(1)); + assertTrue(interrupter.tryInterrupt(2)); + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); + assertTrue(interrupter.tryInterrupt(3)); + } +}