Skip to content

Commit

Permalink
added requested comments and changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dedocibula committed Dec 10, 2024
1 parent 636f02c commit 8fd02f1
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,14 @@ public class ChildPartitionsRecordAction {
* @param record the change stream child partition record received
* @param tracker the restriction tracker of the {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
* @param interrupter the restriction interrupter suggesting early termination of the processing
* @param watermarkEstimator the watermark estimator of the {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
* @return {@link Optional#empty()} if the caller can continue processing more records. A non
* empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
* to claim the {@link ChildPartitionsRecord} timestamp
* to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with
* {@link ProcessContinuation#resume()} if this function should commit what has already been
* processed and resume.
*/
@VisibleForTesting
public Optional<ProcessContinuation> run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter;
Expand Down Expand Up @@ -69,12 +68,15 @@ public DataChangeRecordAction(ThroughputEstimator<DataChangeRecord> throughputEs
* @param partition the current partition being processed
* @param record the change stream data record received
* @param tracker the restriction tracker of the {@link ReadChangeStreamPartitionDoFn} SDF
* @param interrupter the restriction interrupter suggesting early termination of the processing
* @param outputReceiver the output receiver of the {@link ReadChangeStreamPartitionDoFn} SDF
* @param watermarkEstimator the watermark estimator of the {@link ReadChangeStreamPartitionDoFn}
* SDF
* @return {@link Optional#empty()} if the caller can continue processing more records. A non
* empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
* to claim the {@link ChildPartitionsRecord} timestamp
* to claim the {@link DataChangeRecord} timestamp. A non empty {@link Optional} with {@link
* ProcessContinuation#resume()} if this function should commit what has already been
* processed and resume.
*/
@VisibleForTesting
public Optional<ProcessContinuation> run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public class HeartbeatRecordAction {
* not. If the {@link Optional} returned is empty, it means that the calling function can continue
* with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned,
* it means that this function was unable to claim the timestamp of the {@link HeartbeatRecord},
* so the caller should stop.
* so the caller should stop. If an {@link Optional} of {@link ProcessContinuation#resume()} is
* returned, it means that this function should not attempt to claim further timestamps of the
* {@link HeartbeatRecord}, but instead should commit what it has processed so far.
*
* <p>When processing the {@link HeartbeatRecord} the following procedure is applied:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public class QueryChangeStreamAction {

private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class);
private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5);
/*
* Corresponds to the best effort timeout in case the restriction tracker cannot split the processing
* interval before the hard deadline. When reached it will assure that the already processed timestamps
* will be committed instead of thrown away (DEADLINE_EXCEEDED). The value should be less than
* the RetrySetting RPC timeout setting of SpannerIO#ReadChangeStream.
*/
private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40);
private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@

import java.util.function.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/** An interrupter for restriction tracker of type T. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class RestrictionInterrupter<T> {
private T lastAttemptedPosition;
private @Nullable T lastAttemptedPosition;

private Supplier<Instant> timeSupplier;
private Instant softDeadline;
private final Instant softDeadline;
private boolean hasInterrupted = true;

/**
Expand Down Expand Up @@ -67,18 +66,20 @@ void setTimeSupplier(Supplier<Instant> timeSupplier) {
* @return {@code true} if the position processing should continue, {@code false} if the soft
* deadline has been reached and we have fully processed the previous position.
*/
public boolean tryInterrupt(T position) {
public boolean tryInterrupt(@NonNull T position) {
if (hasInterrupted) {
return true;
}
if (lastAttemptedPosition == null) {
lastAttemptedPosition = position;
return false;
}

hasInterrupted |=
timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition);
if (position.equals(lastAttemptedPosition)) {
return false;
}
lastAttemptedPosition = position;

hasInterrupted |= timeSupplier.get().isAfter(softDeadline);
return hasInterrupted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ public void testTryInterrupt() {
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30));
assertFalse(interrupter.tryInterrupt(3));
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40));
// Though the deadline has passed same position as previously accepted is not interrupted.
assertFalse(interrupter.tryInterrupt(3));
assertTrue(interrupter.tryInterrupt(4));
assertTrue(interrupter.tryInterrupt(5));
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50));
assertTrue(interrupter.tryInterrupt(5));
// Even with non-monotonic clock the interrupter will now always interrupt.
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40));
assertTrue(interrupter.tryInterrupt(5));
}

@Test
Expand All @@ -52,6 +56,7 @@ public void testTryInterruptNoPreviousPosition() {
() -> Instant.ofEpochSecond(0), Duration.standardSeconds(30));
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40));
assertFalse(interrupter.tryInterrupt(1));
// Though the deadline has passed same position as previously accepted is not interrupted.
assertFalse(interrupter.tryInterrupt(1));
assertTrue(interrupter.tryInterrupt(2));
interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50));
Expand Down

0 comments on commit 8fd02f1

Please sign in to comment.