diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java index 291213e93ada..715c18098c00 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java @@ -95,11 +95,14 @@ public class ChildPartitionsRecordAction { * @param record the change stream child partition record received * @param tracker the restriction tracker of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param watermarkEstimator the watermark estimator of the {@link * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link ChildPartitionsRecord} timestamp. A non empty {@link Optional} with + * {@link ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java index 9913d854c5fa..555b1fefbebc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java @@ -21,7 +21,6 @@ import java.util.Optional; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.estimator.ThroughputEstimator; -import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.RestrictionInterrupter; @@ -69,12 +68,15 @@ public DataChangeRecordAction(ThroughputEstimator throughputEs * @param partition the current partition being processed * @param record the change stream data record received * @param tracker the restriction tracker of the {@link ReadChangeStreamPartitionDoFn} SDF + * @param interrupter the restriction interrupter suggesting early termination of the processing * @param outputReceiver the output receiver of the {@link ReadChangeStreamPartitionDoFn} SDF * @param watermarkEstimator the watermark estimator of the {@link ReadChangeStreamPartitionDoFn} * SDF * @return {@link Optional#empty()} if the caller can continue processing more records. A non * empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable - * to claim the {@link ChildPartitionsRecord} timestamp + * to claim the {@link DataChangeRecord} timestamp. A non empty {@link Optional} with {@link + * ProcessContinuation#resume()} if this function should commit what has already been + * processed and resume. */ @VisibleForTesting public Optional run( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java index 446907b9024a..0937e896fbf1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordAction.java @@ -57,7 +57,9 @@ public class HeartbeatRecordAction { * not. If the {@link Optional} returned is empty, it means that the calling function can continue * with the processing. If an {@link Optional} of {@link ProcessContinuation#stop()} is returned, * it means that this function was unable to claim the timestamp of the {@link HeartbeatRecord}, - * so the caller should stop. + * so the caller should stop. If an {@link Optional} of {@link ProcessContinuation#resume()} is + * returned, it means that this function should not attempt to claim further timestamps of the + * {@link HeartbeatRecord}, but instead should commit what it has processed so far. * *

When processing the {@link HeartbeatRecord} the following procedure is applied: * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 67c71b0b46d0..6edbd544a37c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -63,6 +63,12 @@ public class QueryChangeStreamAction { private static final Logger LOG = LoggerFactory.getLogger(QueryChangeStreamAction.class); private static final Duration BUNDLE_FINALIZER_TIMEOUT = Duration.standardMinutes(5); + /* + * Corresponds to the best effort timeout in case the restriction tracker cannot split the processing + * interval before the hard deadline. When reached it will assure that the already processed timestamps + * will be committed instead of thrown away (DEADLINE_EXCEEDED). The value should be less than + * the RetrySetting RPC timeout setting of SpannerIO#ReadChangeStream. + */ private static final Duration RESTRICTION_TRACKER_TIMEOUT = Duration.standardSeconds(40); private static final String OUT_OF_RANGE_ERROR_MESSAGE = "Specified start_timestamp is invalid"; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java index 81696550747a..37e91911867a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java @@ -19,18 +19,17 @@ import java.util.function.Supplier; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; /** An interrupter for restriction tracker of type T. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class RestrictionInterrupter { - private T lastAttemptedPosition; + private @Nullable T lastAttemptedPosition; private Supplier timeSupplier; - private Instant softDeadline; + private final Instant softDeadline; private boolean hasInterrupted = true; /** @@ -67,7 +66,7 @@ void setTimeSupplier(Supplier timeSupplier) { * @return {@code true} if the position processing should continue, {@code false} if the soft * deadline has been reached and we have fully processed the previous position. */ - public boolean tryInterrupt(T position) { + public boolean tryInterrupt(@NonNull T position) { if (hasInterrupted) { return true; } @@ -75,10 +74,12 @@ public boolean tryInterrupt(T position) { lastAttemptedPosition = position; return false; } - - hasInterrupted |= - timeSupplier.get().isAfter(softDeadline) && !position.equals(lastAttemptedPosition); + if (position.equals(lastAttemptedPosition)) { + return false; + } lastAttemptedPosition = position; + + hasInterrupted |= timeSupplier.get().isAfter(softDeadline); return hasInterrupted; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java index dc76465dbf39..6d376ec528ba 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java @@ -38,11 +38,15 @@ public void testTryInterrupt() { interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(30)); assertFalse(interrupter.tryInterrupt(3)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + // Though the deadline has passed same position as previously accepted is not interrupted. assertFalse(interrupter.tryInterrupt(3)); assertTrue(interrupter.tryInterrupt(4)); assertTrue(interrupter.tryInterrupt(5)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50)); assertTrue(interrupter.tryInterrupt(5)); + // Even with non-monotonic clock the interrupter will now always interrupt. + interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); + assertTrue(interrupter.tryInterrupt(5)); } @Test @@ -52,6 +56,7 @@ public void testTryInterruptNoPreviousPosition() { () -> Instant.ofEpochSecond(0), Duration.standardSeconds(30)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(40)); assertFalse(interrupter.tryInterrupt(1)); + // Though the deadline has passed same position as previously accepted is not interrupted. assertFalse(interrupter.tryInterrupt(1)); assertTrue(interrupter.tryInterrupt(2)); interrupter.setTimeSupplier(() -> Instant.ofEpochSecond(50));