Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat 32473] Added soft deadline logic to Spanner Change Stream IO connector. #32474

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -105,6 +106,7 @@ public Optional<ProcessContinuation> run(
PartitionMetadata partition,
ChildPartitionsRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
ManualWatermarkEstimator<Instant> watermarkEstimator) {

final String token = partition.getPartitionToken();
Expand All @@ -113,6 +115,13 @@ public Optional<ProcessContinuation> run(

final Timestamp startTimestamp = record.getStartTimestamp();
final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(startTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with child partitions record at {}, rescheduling",
token,
startTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(startTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand Down Expand Up @@ -80,6 +81,7 @@ public Optional<ProcessContinuation> run(
PartitionMetadata partition,
DataChangeRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
OutputReceiver<DataChangeRecord> outputReceiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

Expand All @@ -88,6 +90,13 @@ public Optional<ProcessContinuation> run(

final Timestamp commitTimestamp = record.getCommitTimestamp();
final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(commitTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with data change record at {}, rescheduling",
token,
commitTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(commitTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -72,13 +73,19 @@ public Optional<ProcessContinuation> run(
PartitionMetadata partition,
HeartbeatRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
RestrictionInterrupter<Timestamp> interrupter,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

final String token = partition.getPartitionToken();
LOG.debug("[{}] Processing heartbeat record {}", token, record);

final Timestamp timestamp = record.getTimestamp();
final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime());
if (interrupter.tryInterrupt(timestamp)) {
LOG.debug(
"[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(timestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class QueryChangeStreamAction {

private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40);
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";

private final ChangeStreamDao changeStreamDao;
Expand Down Expand Up @@ -164,6 +166,10 @@ public ProcessContinuation run(
new IllegalStateException(
"Partition " + token + " not found in metadata table"));

// Interrupter with soft timeout to commit the work if any records have been processed.
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
Expand All @@ -182,16 +188,25 @@ public ProcessContinuation run(
updatedPartition,
(DataChangeRecord) record,
tracker,
interrupter,
receiver,
watermarkEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run(
updatedPartition, (HeartbeatRecord) record, tracker, watermarkEstimator);
updatedPartition,
(HeartbeatRecord) record,
tracker,
interrupter,
watermarkEstimator);
} else if (record instanceof ChildPartitionsRecord) {
maybeContinuation =
childPartitionsRecordAction.run(
updatedPartition, (ChildPartitionsRecord) record, tracker, watermarkEstimator);
updatedPartition,
(ChildPartitionsRecord) record,
tracker,
interrupter,
watermarkEstimator);
} else {
LOG.error("[{}] Unknown record type {}", token, record.getClass());
throw new IllegalArgumentException("Unknown record type " + record.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;

import java.util.function.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** An interrupter for restriction tracker of type T. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
})
public class RestrictionInterrupter<T> {
private T lastAttemptedPosition;

private Supplier<Instant> timeSupplier;
private Instant softDeadline;
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
private boolean hasInterrupted = true;

/**
* Sets a soft timeout from now for processing new positions. After the timeout the tryInterrupt
* will start returning true indicating an early exit from processing.
*/
public static <T> RestrictionInterrupter<T> withSoftTimeout(Duration timeout) {
return new RestrictionInterrupter<T>(() -> Instant.now(), timeout);
}

RestrictionInterrupter(Supplier<Instant> timeSupplier, Duration timeout) {
this.timeSupplier = timeSupplier;
this.softDeadline = this.timeSupplier.get().plus(timeout);
hasInterrupted = false;
}

@VisibleForTesting
void setTimeSupplier(Supplier<Instant> timeSupplier) {
this.timeSupplier = timeSupplier;
}

/**
* Returns true if the restriction tracker should be interrupted in claiming new positions.
*
* <ol>
* <li>If soft deadline hasn't been reached always returns false.
* <li>If soft deadline has been reached but we haven't processed any positions returns false.
* <li>If soft deadline has been reached but the new position is the same as the last attempted
* position returns false.
* <li>If soft deadline has been reached and the new position differs from the last attempted
* position returns true.
* </ol>
*
* @return {@code true} if the position processing should continue, {@code false} if the soft
* deadline has been reached and we have fully processed the previous position.
*/
public boolean tryInterrupt(T position) {
if (hasInterrupted) {
return true;
}
if (lastAttemptedPosition == null) {
lastAttemptedPosition = position;
return false;
}

hasInterrupted |=
dedocibula marked this conversation as resolved.
Show resolved Hide resolved
timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition);
lastAttemptedPosition = position;
return hasInterrupted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand All @@ -55,6 +56,7 @@ public class ChildPartitionsRecordActionTest {
private ChangeStreamMetrics metrics;
private ChildPartitionsRecordAction action;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private RestrictionInterrupter<Timestamp> interrupter;
private ManualWatermarkEstimator<Instant> watermarkEstimator;

@Before
Expand All @@ -64,6 +66,7 @@ public void setUp() {
metrics = mock(ChangeStreamMetrics.class);
action = new ChildPartitionsRecordAction(dao, metrics);
tracker = mock(RestrictionTracker.class);
interrupter = mock(RestrictionInterrupter.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);

when(dao.runInTransaction(any(), anyObject()))
Expand Down Expand Up @@ -93,7 +96,7 @@ public void testRestrictionClaimedAndIsSplitCase() {
when(transaction.getPartition("childPartition2")).thenReturn(null);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -144,7 +147,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() {
when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class));

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -173,7 +176,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() {
when(transaction.getPartition(childPartitionToken)).thenReturn(null);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand Down Expand Up @@ -213,7 +216,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() {
when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class));

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
Expand All @@ -237,10 +240,35 @@ public void testRestrictionNotClaimed() {
when(tracker.tryClaim(startTimestamp)).thenReturn(false);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}

@Test
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
final PartitionMetadata partition = mock(PartitionMetadata.class);
final ChildPartitionsRecord record =
new ChildPartitionsRecord(
startTimestamp,
"recordSequence",
Arrays.asList(
new ChildPartition("childPartition1", partitionToken),
new ChildPartition("childPartition2", partitionToken)),
null);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(interrupter.tryInterrupt(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, interrupter, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand All @@ -44,6 +45,7 @@ public class DataChangeRecordActionTest {
private DataChangeRecordAction action;
private PartitionMetadata partition;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private RestrictionInterrupter<Timestamp> interrupter;
private OutputReceiver<DataChangeRecord> outputReceiver;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private BytesThroughputEstimator<DataChangeRecord> throughputEstimator;
Expand All @@ -54,6 +56,7 @@ public void setUp() {
action = new DataChangeRecordAction(throughputEstimator);
partition = mock(PartitionMetadata.class);
tracker = mock(RestrictionTracker.class);
interrupter = mock(RestrictionInterrupter.class);
outputReceiver = mock(OutputReceiver.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);
}
Expand All @@ -69,7 +72,7 @@ public void testRestrictionClaimed() {
when(partition.getPartitionToken()).thenReturn(partitionToken);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator);

assertEquals(Optional.empty(), maybeContinuation);
verify(outputReceiver).outputWithTimestamp(record, instant);
Expand All @@ -87,11 +90,30 @@ public void testRestrictionNotClaimed() {
when(partition.getPartitionToken()).thenReturn(partitionToken);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
verify(outputReceiver, never()).outputWithTimestamp(any(), any());
verify(watermarkEstimator, never()).setWatermark(any());
verify(throughputEstimator, never()).update(any(), any());
}

@Test
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
final DataChangeRecord record = mock(DataChangeRecord.class);
when(record.getCommitTimestamp()).thenReturn(timestamp);
when(interrupter.tryInterrupt(timestamp)).thenReturn(true);
when(tracker.tryClaim(timestamp)).thenReturn(true);
when(partition.getPartitionToken()).thenReturn(partitionToken);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, interrupter, outputReceiver, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(outputReceiver, never()).outputWithTimestamp(any(), any());
verify(watermarkEstimator, never()).setWatermark(any());
verify(throughputEstimator, never()).update(any(), any());
}
}
Loading
Loading