diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index 12cbecd04b02..9ad3141f9666 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -164,7 +165,9 @@ public void checkDone() throws IllegalStateException { @Override public IsBounded isBounded() { - return IsBounded.BOUNDED; + return range.getTo() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis() + ? IsBounded.UNBOUNDED + : IsBounded.BOUNDED; } @Override @@ -213,6 +216,13 @@ public RestrictionTracker.TruncateResult truncate() { return null; } + @GetSize + public double getSize( + @Element SequenceDefinition sequence, @Restriction OffsetRange offsetRange) { + long nowMilliSec = Instant.now().getMillis(); + return sequenceBacklogBytes(sequence.durationMilliSec, nowMilliSec, offsetRange); + } + @ProcessElement public ProcessContinuation processElement( @Element SequenceDefinition srcElement, @@ -257,4 +267,26 @@ public ProcessContinuation processElement( public PCollection expand(PCollection input) { return input.apply(ParDo.of(new PeriodicSequenceFn())); } + + private static final int ENCODED_INSTANT_BYTES = 8; + + private static long ceilDiv(long a, long b) { + long result = Math.floorDiv(a, b); + if (a % b != 0) { + ++result; + } + return result; + } + + @VisibleForTesting + static long sequenceBacklogBytes( + long durationMilliSec, long nowMilliSec, OffsetRange offsetRange) { + // Find the # of outputs expected for overlap of offsetRange and [-inf, now) + long start = ceilDiv(offsetRange.getFrom(), durationMilliSec); + long end = ceilDiv(Math.min(nowMilliSec, offsetRange.getTo() - 1), durationMilliSec); + if (start >= end) { + return 0; + } + return ENCODED_INSTANT_BYTES * (end - start); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java index 3ace145eba88..541a70933870 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PeriodicSequenceTest.java @@ -24,6 +24,7 @@ import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; +import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -136,4 +137,25 @@ public void testOutputsProperElements() { p.run().waitUntilFinish(); } + + @Test + public void testBacklogBytes() { + assertEquals( + 0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(100, Long.MAX_VALUE))); + assertEquals( + 8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(90, Long.MAX_VALUE))); + assertEquals( + 0, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(91, Long.MAX_VALUE))); + assertEquals( + 8, PeriodicSequence.sequenceBacklogBytes(10, 100, new OffsetRange(89, Long.MAX_VALUE))); + assertEquals( + 16, PeriodicSequence.sequenceBacklogBytes(10, 101, new OffsetRange(81, Long.MAX_VALUE))); + assertEquals( + 8 * 10000 / 100, + PeriodicSequence.sequenceBacklogBytes(100, 10000, new OffsetRange(0, Long.MAX_VALUE))); + assertEquals( + 0, PeriodicSequence.sequenceBacklogBytes(10, 10000, new OffsetRange(10011, 10025))); + assertEquals( + 8, PeriodicSequence.sequenceBacklogBytes(10, 10100, new OffsetRange(10011, 10025))); + } }