Skip to content

Commit

Permalink
added interruptible interface to the read change stream partition rec…
Browse files Browse the repository at this point in the history
…ord tracker
  • Loading branch information
dedocibula committed Sep 16, 2024
1 parent e005591 commit 02bdee7
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -113,6 +114,14 @@ public Optional<ProcessContinuation> run(

final Timestamp startTimestamp = record.getStartTimestamp();
final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime());
if (tracker instanceof Interruptible
&& !((Interruptible) tracker).shouldContinue(startTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with child partitions record at {}, rescheduling",
token,
startTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(startTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, startTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
Expand All @@ -43,7 +44,9 @@ public class DataChangeRecordAction {
private static final Logger LOG = LoggerFactory.getLogger(DataChangeRecordAction.class);
private final ThroughputEstimator<DataChangeRecord> throughputEstimator;

/** @param throughputEstimator an estimator to calculate local throughput of this action. */
/**
* @param throughputEstimator an estimator to calculate local throughput of this action.
*/
public DataChangeRecordAction(ThroughputEstimator<DataChangeRecord> throughputEstimator) {
this.throughputEstimator = throughputEstimator;
}
Expand Down Expand Up @@ -88,6 +91,14 @@ public Optional<ProcessContinuation> run(

final Timestamp commitTimestamp = record.getCommitTimestamp();
final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime());
if (tracker instanceof Interruptible
&& !((Interruptible) tracker).shouldContinue(commitTimestamp)) {
LOG.debug(
"[{}] Soft deadline reached with data change record at {}, rescheduling",
token,
commitTimestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(commitTimestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, commitTimestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
Expand Down Expand Up @@ -79,6 +80,11 @@ public Optional<ProcessContinuation> run(

final Timestamp timestamp = record.getTimestamp();
final Instant timestampInstant = new Instant(timestamp.toSqlTimestamp().getTime());
if (tracker instanceof Interruptible && !((Interruptible) tracker).shouldContinue(timestamp)) {
LOG.debug(
"[{}] Soft deadline reached with heartbeat record at {}, rescheduling", token, timestamp);
return Optional.of(ProcessContinuation.resume());
}
if (!tracker.tryClaim(timestamp)) {
LOG.debug("[{}] Could not claim queryChangeStream({}), stopping", token, timestamp);
return Optional.of(ProcessContinuation.stop());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.Interruptible;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class QueryChangeStreamAction {

private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40);
private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";

private final ChangeStreamDao changeStreamDao;
Expand Down Expand Up @@ -164,6 +166,11 @@ public ProcessContinuation run(
new IllegalStateException(
"Partition " + token + " not found in metadata table"));

// Set the soft timeout to commit the work if any records have been processed.
if (tracker instanceof Interruptible) {
((Interruptible) tracker).setSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
}

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;

import com.google.cloud.Timestamp;
import org.joda.time.Duration;

/** An interruptible interface for timestamp restriction tracker. */
public interface Interruptible {
/** Sets a soft timeout from now for processing new timestamps. */
public void setSoftTimeout(Duration duration);

/**
* Returns true if the timestamp tracker can process new timestamps or false if it should
* interrupt processing.
*
* @return {@code true} if the position processing should continue, {@code false} if the soft
* deadline has been reached and we have fully processed the previous position.
*/
public boolean shouldContinue(Timestamp position);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
* This restriction tracker delegates most of its behavior to an internal {@link
Expand All @@ -34,9 +36,12 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker {
public class ReadChangeStreamPartitionRangeTracker extends TimestampRangeTracker
implements Interruptible {

private final PartitionMetadata partition;
private Instant softDeadline;
private boolean continueProcessing = true;

/**
* Receives the partition that will be queried and the timestamp range that belongs to it.
Expand All @@ -49,6 +54,48 @@ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, Timest
this.partition = partition;
}

/**
* Sets a soft timeout from now for processing new positions. After the timeout the shouldContinue
* will start returning false indicating an early exit from processing.
*/
@Override
public void setSoftTimeout(Duration duration) {
softDeadline = new Instant(timeSupplier.get().toSqlTimestamp()).plus(duration);
continueProcessing = true;
}

/**
* Returns true if the restriction tracker can claim new positions.
*
* <p>If soft timeout isn't set always returns true. Otherwise:
*
* <ol>
* <li>If soft deadline hasn't been reached always returns true.
* <li>If soft deadline has been reached but we haven't processed any positions returns true.
* <li>If soft deadline has been reached but the new position is the same as the last attempted
* position returns true.
* <li>If soft deadline has been reached and the new position differs from the last attempted
* position returns false.
* </ol>
*
* @return {@code true} if the position processing should continue, {@code false} if the soft
* deadline has been reached and we have fully processed the previous position.
*/
@Override
public boolean shouldContinue(Timestamp position) {
if (!continueProcessing) {
return false;
}
if (softDeadline == null || lastAttemptedPosition == null) {
return true;
}

continueProcessing &=
new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline)
|| position.equals(lastAttemptedPosition);
return continueProcessing;
}

/**
* Attempts to claim the given position.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,10 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.junit.Before;
Expand All @@ -54,7 +53,7 @@ public class ChildPartitionsRecordActionTest {
private InTransactionContext transaction;
private ChangeStreamMetrics metrics;
private ChildPartitionsRecordAction action;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private ReadChangeStreamPartitionRangeTracker tracker;
private ManualWatermarkEstimator<Instant> watermarkEstimator;

@Before
Expand All @@ -63,7 +62,7 @@ public void setUp() {
transaction = mock(InTransactionContext.class);
metrics = mock(ChangeStreamMetrics.class);
action = new ChildPartitionsRecordAction(dao, metrics);
tracker = mock(RestrictionTracker.class);
tracker = mock(ReadChangeStreamPartitionRangeTracker.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);

when(dao.runInTransaction(any(), anyObject()))
Expand All @@ -88,6 +87,7 @@ public void testRestrictionClaimedAndIsSplitCase() {
when(partition.getEndTimestamp()).thenReturn(endTimestamp);
when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);
when(transaction.getPartition("childPartition1")).thenReturn(null);
when(transaction.getPartition("childPartition2")).thenReturn(null);
Expand Down Expand Up @@ -139,6 +139,7 @@ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() {
when(partition.getEndTimestamp()).thenReturn(endTimestamp);
when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);
when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class));
when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class));
Expand Down Expand Up @@ -169,6 +170,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() {
when(partition.getEndTimestamp()).thenReturn(endTimestamp);
when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);
when(transaction.getPartition(childPartitionToken)).thenReturn(null);

Expand Down Expand Up @@ -209,6 +211,7 @@ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() {
when(partition.getEndTimestamp()).thenReturn(endTimestamp);
when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);
when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class));

Expand All @@ -234,6 +237,7 @@ public void testRestrictionNotClaimed() {
new ChildPartition("childPartition2", partitionToken)),
null);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(true);
when(tracker.tryClaim(startTimestamp)).thenReturn(false);

final Optional<ProcessContinuation> maybeContinuation =
Expand All @@ -243,4 +247,29 @@ public void testRestrictionNotClaimed() {
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}

@Test
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
final PartitionMetadata partition = mock(PartitionMetadata.class);
final ChildPartitionsRecord record =
new ChildPartitionsRecord(
startTimestamp,
"recordSequence",
Arrays.asList(
new ChildPartition("childPartition1", partitionToken),
new ChildPartition("childPartition2", partitionToken)),
null);
when(partition.getPartitionToken()).thenReturn(partitionToken);
when(tracker.shouldContinue(startTimestamp)).thenReturn(false);
when(tracker.tryClaim(startTimestamp)).thenReturn(true);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(watermarkEstimator, never()).setWatermark(any());
verify(dao, never()).insert(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.BytesThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -43,7 +42,7 @@ public class DataChangeRecordActionTest {

private DataChangeRecordAction action;
private PartitionMetadata partition;
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private ReadChangeStreamPartitionRangeTracker tracker;
private OutputReceiver<DataChangeRecord> outputReceiver;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
private BytesThroughputEstimator<DataChangeRecord> throughputEstimator;
Expand All @@ -53,7 +52,7 @@ public void setUp() {
throughputEstimator = mock(BytesThroughputEstimator.class);
action = new DataChangeRecordAction(throughputEstimator);
partition = mock(PartitionMetadata.class);
tracker = mock(RestrictionTracker.class);
tracker = mock(ReadChangeStreamPartitionRangeTracker.class);
outputReceiver = mock(OutputReceiver.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);
}
Expand All @@ -65,6 +64,7 @@ public void testRestrictionClaimed() {
final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime());
final DataChangeRecord record = mock(DataChangeRecord.class);
when(record.getCommitTimestamp()).thenReturn(timestamp);
when(tracker.shouldContinue(timestamp)).thenReturn(true);
when(tracker.tryClaim(timestamp)).thenReturn(true);
when(partition.getPartitionToken()).thenReturn(partitionToken);

Expand All @@ -83,6 +83,7 @@ public void testRestrictionNotClaimed() {
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
final DataChangeRecord record = mock(DataChangeRecord.class);
when(record.getCommitTimestamp()).thenReturn(timestamp);
when(tracker.shouldContinue(timestamp)).thenReturn(true);
when(tracker.tryClaim(timestamp)).thenReturn(false);
when(partition.getPartitionToken()).thenReturn(partitionToken);

Expand All @@ -94,4 +95,23 @@ public void testRestrictionNotClaimed() {
verify(watermarkEstimator, never()).setWatermark(any());
verify(throughputEstimator, never()).update(any(), any());
}

@Test
public void testSoftDeadlineReached() {
final String partitionToken = "partitionToken";
final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
final DataChangeRecord record = mock(DataChangeRecord.class);
when(record.getCommitTimestamp()).thenReturn(timestamp);
when(tracker.shouldContinue(timestamp)).thenReturn(false);
when(tracker.tryClaim(timestamp)).thenReturn(true);
when(partition.getPartitionToken()).thenReturn(partitionToken);

final Optional<ProcessContinuation> maybeContinuation =
action.run(partition, record, tracker, outputReceiver, watermarkEstimator);

assertEquals(Optional.of(ProcessContinuation.resume()), maybeContinuation);
verify(outputReceiver, never()).outputWithTimestamp(any(), any());
verify(watermarkEstimator, never()).setWatermark(any());
verify(throughputEstimator, never()).update(any(), any());
}
}
Loading

0 comments on commit 02bdee7

Please sign in to comment.