Skip to content

Commit

Permalink
[feat 32473] Added soft deadline logic to Spanner Change Stream IO co…
Browse files Browse the repository at this point in the history
…nnector. (#32474)

* added explicit restriction interrupter class
  • Loading branch information
dedocibula authored Jan 7, 2025
1 parent e657d6c commit 43b2bf7
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -94,17 +95,21 @@ public class ChildPartitionsRecordAction {
* @param record the change stream child partition record received
* @param tracker the restriction tracker of the {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
* @param interrupter the restriction interrupter suggesting early termination of the processing
* @param watermarkEstimator the watermark estimator of the {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
* @return {@link Optional#empty()} if the caller can continue processing more records. A non
* empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
* to claim the {@link ChildPartitionsRecord} timestamp
* to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with
* {@link ProcessContinuation#resume()} if this function should commit what has already been
* processed and resume.
*/
@VisibleForTesting
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
ChildPartitionsRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

final String token = partition.getPartitionToken();
Expand All @@ -113,6 +118,13 @@ public Optional<ProcessContinuation> run(

final Timestamp startTimestamp = record.getStartTimestamp();
final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(startTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with child partitions record at {}, rescheduling",
token,
startTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(startTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand Down Expand Up @@ -68,18 +68,22 @@ public DataChangeRecordAction(ThroughputEstimator<DataChangeRecord> throughputEs
* @param partition the current partition being processed
* @param record the change stream data record received
* @param tracker the restriction tracker of the {@link ReadChangeStreamPartitionDoFn} SDF
* @param interrupter the restriction interrupter suggesting early termination of the processing
* @param outputReceiver the output receiver of the {@link ReadChangeStreamPartitionDoFn} SDF
* @param watermarkEstimator the watermark estimator of the {@link ReadChangeStreamPartitionDoFn}
* SDF
* @return {@link Optional#empty()} if the caller can continue processing more records. A non
* empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
* to claim the {@link ChildPartitionsRecord} timestamp
* to claim the {@link DataChangeRecord} timestamp. A non empty {@link Optional} with {@link
* ProcessContinuation#resume()} if this function should commit what has already been
* processed and resume.
*/
@VisibleForTesting
public Optional<ProcessContinuation> run(
PartitionMetadata partition,
DataChangeRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
OutputReceiver<DataChangeRecord> outputReceiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

Expand All @@ -88,6 +92,13 @@ public Optional<ProcessContinuation> run(

final Timestamp commitTimestamp = record.getCommitTimestamp();
final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(commitTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with data change record at {}, rescheduling",
token,
commitTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(commitTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -56,7 +57,9 @@ public class HeartbeatRecordAction {
* not. If the {@link Optional} returned is empty, it means that the calling function can continue
* with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned,
* it means that this function was unable to claim the timestamp of the {@link HeartbeatRecord},
* so the caller should stop.
* so the caller should stop. If an {@link Optional} of {@link ProcessContinuation#resume()} is
* returned, it means that this function should not attempt to claim further timestamps of the
* {@link HeartbeatRecord}, but instead should commit what it has processed so far.
*
* <p>When processing the {@link HeartbeatRecord} the following procedure is applied:
*
Expand All @@ -72,13 +75,19 @@ public Optional<ProcessContinuation> run(
PartitionMetadata partition,
HeartbeatRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

final String token = partition.getPartitionToken();
LOG.debug("[{}] Processing heartbeat record {}", token, record);

final Timestamp timestamp = record.getTimestamp();
final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(timestamp)) {
LOG.debug(
"[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(timestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -62,6 +63,13 @@ public class QueryChangeStreamAction {

private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
/*
* Corresponds to the best effort timeout in case the restriction tracker cannot split the processing
* interval before the hard deadline. When reached it will assure that the already processed timestamps
* will be committed instead of thrown away (DEADLINE_EXCEEDED). The value should be less than
* the RetrySetting RPC timeout setting of SpannerIO#ReadChangeStream.
*/
private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40);
private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";

private final ChangeStreamDao changeStreamDao;
Expand Down Expand Up @@ -164,6 +172,10 @@ public ProcessContinuation run(
new IllegalStateException(
"Partition " + token + " not found in metadata table"));

// Interrupter with soft timeout to commit the work if any records have been processed.
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
Expand All @@ -182,16 +194,25 @@ public ProcessContinuation run(
updatedPartition,
(DataChangeRecord) record,
tracker,
interrupter,
receiver,
watermarkEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run(
updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator);
updatedPartition,
(HeartbeatRecord) record,
tracker,
interrupter,
watermarkEstimator);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
updatedPartition,
(ChildPartitionsRecord) record,
tracker,
interrupter,
watermarkEstimator);
} else {
LOG.error("[{}] Unknown record type {}", token, record.getClass());
throw new IllegalArgumentException("Unknown record type " + record.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;

import java.util.function.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** An interrupter for restriction tracker of type T. */
public class RestrictionInterrupter<T> {
private @Nullable T lastAttemptedPosition;

private Supplier<Instant> timeSupplier;
private final Instant softDeadline;
private boolean hasInterrupted = true;

/**
* Sets a soft timeout from now for processing new positions. After the timeout the tryInterrupt
* will start returning true indicating an early exit from processing.
*/
public static <T> RestrictionInterrupter<T> withSoftTimeout(Duration timeout) {
return new RestrictionInterrupter<T>(() -> Instant.now(), timeout);
}

RestrictionInterrupter(Supplier<Instant> timeSupplier, Duration timeout) {
this.timeSupplier = timeSupplier;
this.softDeadline = this.timeSupplier.get().plus(timeout);
hasInterrupted = false;
}

@VisibleForTesting
void setTimeSupplier(Supplier<Instant> timeSupplier) {
this.timeSupplier = timeSupplier;
}

/**
* Returns true if the restriction tracker should be interrupted in claiming new positions.
*
* <ol>
* <li>If soft deadline hasn't been reached always returns false.
* <li>If soft deadline has been reached but we haven't processed any positions returns false.
* <li>If soft deadline has been reached but the new position is the same as the last attempted
* position returns false.
* <li>If soft deadline has been reached and the new position differs from the last attempted
* position returns true.
* </ol>
*
* @return {@code true} if the position processing should continue, {@code false} if the soft
* deadline has been reached and we have fully processed the previous position.
*/
public boolean tryInterrupt(@NonNull T position) {
if (hasInterrupted) {
return true;
}
if (lastAttemptedPosition == null) {
lastAttemptedPosition = position;
return false;
}
if (position.equals(lastAttemptedPosition)) {
return false;
}
lastAttemptedPosition = position;

hasInterrupted |= timeSupplier.get().isAfter(softDeadline);
return hasInterrupted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand All @@ -55,6 +56,7 @@ public class ChildPartitionsRecordActionTest {
private ChangeStreamMetrics metrics;
private ChildPartitionsRecordAction action;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private RestrictionInterrupter<Timestamp> interrupter;
private ManualWatermarkEstimator<Instant> watermarkEstimator;

@Before
Expand All @@ -64,6 +66,7 @@ public void setUp() {
metrics = mock(ChangeStreamMetrics.class);
action = new ChildPartitionsRecordAction(dao, metrics);
tracker = mock(RestrictionTracker.class);
interrupter = mock(RestrictionInterrupter.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);

when(dao.runInTransaction(any(), anyObject()))
Expand Down Expand Up @@ -93,7 +96,7 @@ public void testRestrictionClaimedAndIsSplitCase() {
when(transaction.getPartition("childPartition2")).thenReturn(null);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -144,7 +147,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() {
when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class));

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -173,7 +176,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() {
when(transaction.getPartition(childPartitionToken)).thenReturn(null);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -213,7 +216,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() {
when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class));

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand All @@ -237,10 +240,35 @@ public void testRestrictionNotClaimed() {
when(tracker.tryClaim(startTimestamp)).thenReturn(false);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}

@Test
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
final PartitionMetadata partition = mock(PartitionMetadata.class);
final ChildPartitionsRecord record =
new ChildPartitionsRecord(
startTimestamp,
"recordSequence",
Arrays.asList(
new ChildPartition("childPartition1", partitionToken),
new ChildPartition("childPartition2", partitionToken)),
null);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(interrupter.tryInterrupt(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}
}
Loading

0 comments on commit 43b2bf7

Please sign in to comment.