diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 5ea2c4968dd9..9cf0606b68b9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -21,6 +21,7 @@ import com.google.auto.service.AutoService; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -669,6 +670,15 @@ public void output(RestrictionT part) { public void outputWithTimestamp(RestrictionT part, Instant timestamp) { throw new UnsupportedOperationException(); } + + @Override + public void outputWindowedValue( + RestrictionT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException(); + } }; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 061e1cb11b5c..ad6b51539742 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements; @@ -525,6 +526,15 @@ public void output(OutputT output) { public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); + } }; } @@ -543,6 +553,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { outerContext.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -583,6 +602,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { outerContext.output(tag, output); @@ -593,6 +621,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outerContext.outputWithTimestamp(tag, output, timestamp); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } + @Override public InputT element() { return element; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 85a46eb7dc04..51cd8c690aee 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -388,11 +390,20 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT value, Instant timestamp) { + outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); + } + + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } - output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); + output.outputWindowedValue(value, timestamp, windows, paneInfo); } @Override @@ -402,11 +413,21 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane()); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); } - output.outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane()); + output.outputWindowedValue(tag, value, timestamp, windows, paneInfo); } private void noteOutput() { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 16986fdf8d52..b375d38c5a98 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -413,22 +414,40 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(elem.getTimestamp(), timestamp); outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { checkNotNull(tag, "Tag passed to output cannot be null"); - outputWindowedValue(tag, elem.withValue(output)); + SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output)); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(elem.getTimestamp(), timestamp); - outputWindowedValue( - tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); + outputWindowedValue(tag, output, timestamp, elem.getWindows(), elem.getPane()); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override @@ -838,16 +857,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(timestamp(), timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWithTimestamp(tag, output, timestamp); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp(), timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp(), timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override @@ -1045,16 +1086,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(this.timestamp, timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWithTimestamp(tag, output, timestamp); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(this.timestamp, timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(this.timestamp, timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 079379953cd9..dc81e23b10ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -55,7 +55,10 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -241,6 +244,37 @@ public static TimestampedValues timestamped( return timestamped(ImmutableList.>builder().add(elem).add(elems).build()); } + /** + * Returns a new {@link Create.WindowedValues} transform that produces a {@link PCollection} + * containing the elements of the provided {@code Iterable} with the specified windowing metadata. + * + *

The argument should not be modified after this is called. + * + *

By default, {@code Create.WindowedValues} can automatically determine the {@code Coder} to + * use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. Otherwise, use {@link Create.WindowedValues#withCoder} to set the coder explicitly. + * + *

Likewise, the window coder can be inferred if the window type is registered with the {@link + * CoderRegistry}. Otherwise, use {@link Create.WindowedValues#withWindowCoder(Coder)} to set the + * window coder explicitly. + */ + public static WindowedValues windowedValues(Iterable> elems) { + return new WindowedValues<>(elems, Optional.absent(), Optional.absent(), Optional.absent()); + } + + /** + * Returns a new {@link Create.WindowedValues} transform that produces a {@link PCollection} + * containing the specified elements with the specified windowing metadata. + * + *

The arguments should not be modified after this is called. + */ + @SafeVarargs + public static WindowedValues windowedValues( + WindowedValue elem, @SuppressWarnings("unchecked") WindowedValue... elems) { + return windowedValues(ImmutableList.>builder().add(elem).add(elems).build()); + } + /** * Returns a new root transform that produces a {@link PCollection} containing the specified * elements with the specified timestamps. @@ -727,6 +761,163 @@ public void processElement(@Element TimestampedValue element, OutputReceiver< } } + /** + * A {@code PTransform} that creates a {@code PCollection} whose elements have associated + * windowing metadata. + */ + public static class WindowedValues extends PTransform> { + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * Coder} to decode each of the objects into a value of type {@code T}. + * + *

By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder} + * to use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + *

Note that for {@link Create.WindowedValues with no elements}, the {@link VoidCoder} is + * used. + */ + public WindowedValues withCoder(Coder coder) { + return new WindowedValues<>(windowedValues, Optional.of(coder), windowCoder, typeDescriptor); + } + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * Coder} to decode each of the objects into a value of type {@code T}. + * + *

By default, {@code Create.WindowedValues} can automatically determine the {@code Coder} to + * use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + *

Note that for {@link Create.WindowedValues with no elements}, the {@link + * GlobalWindow.Coder} is used. + */ + public WindowedValues withWindowCoder(Coder windowCoder) { + return new WindowedValues<>( + windowedValues, elementCoder, Optional.of(windowCoder), typeDescriptor); + } + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * Schema} to represent objects. + */ + public WindowedValues withSchema( + Schema schema, + TypeDescriptor typeDescriptor, + SerializableFunction toRowFunction, + SerializableFunction fromRowFunction) { + return withCoder(SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction)); + } + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * TypeDescriptor} to determine the {@code Coder} to use to decode each of the objects into a + * value of type {@code T}. Note that a default coder must be registered for the class described + * in the {@code TypeDescriptor}. + * + *

By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder} + * to use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + *

Note that for {@link Create.WindowedValues} with no elements, the {@link VoidCoder} is + * used. + */ + public WindowedValues withType(TypeDescriptor type) { + return new WindowedValues<>(windowedValues, elementCoder, windowCoder, Optional.of(type)); + } + + @Override + public PCollection expand(PBegin input) { + try { + Coder coder = null; + CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry(); + if (elementCoder.isPresent()) { + coder = elementCoder.get(); + } else if (typeDescriptor.isPresent()) { + try { + coder = + SchemaCoder.of( + schemaRegistry.getSchema(typeDescriptor.get()), + typeDescriptor.get(), + schemaRegistry.getToRowFunction(typeDescriptor.get()), + schemaRegistry.getFromRowFunction(typeDescriptor.get())); + } catch (NoSuchSchemaException e) { + // No schema registered. + } + if (coder == null) { + coder = coderRegistry.getCoder(typeDescriptor.get()); + } + } else { + Iterable rawElements = Iterables.transform(windowedValues, WindowedValue::getValue); + coder = getDefaultCreateCoder(coderRegistry, schemaRegistry, rawElements); + } + + Coder windowCoder; + if (this.windowCoder.isPresent()) { + windowCoder = this.windowCoder.get(); + } else if (Iterables.isEmpty(windowedValues)) { + windowCoder = GlobalWindow.Coder.INSTANCE; + } else { + Iterable rawWindows = + Iterables.concat(Iterables.transform(windowedValues, WindowedValue::getWindows)); + windowCoder = getDefaultCreateCoder(coderRegistry, schemaRegistry, rawWindows); + } + + PCollection> intermediate = + Pipeline.applyTransform( + input, + Create.of(windowedValues) + .withCoder(WindowedValue.getFullCoder(coder, windowCoder))); + + PCollection output = intermediate.apply(ParDo.of(new ConvertWindowedValues<>())); + output.setCoder(coder); + return output; + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to infer a coder and no Coder was specified. " + + "Please set a coder by invoking CreateTimestamped.withCoder() explicitly.", + e); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** The timestamped elements of the resulting PCollection. */ + private final transient Iterable> windowedValues; + + /** The coder used to encode the values to and from a binary representation. */ + private final transient Optional> elementCoder; + + private final Optional> windowCoder; + + /** The value type. */ + private final transient Optional> typeDescriptor; + + private WindowedValues( + Iterable> windowedValues, + Optional> elementCoder, + Optional> windowCoder, + Optional> typeDescriptor) { + this.windowedValues = windowedValues; + this.elementCoder = elementCoder; + this.windowCoder = windowCoder; + this.typeDescriptor = typeDescriptor; + } + + private static class ConvertWindowedValues extends DoFn, T> { + @ProcessElement + public void processElement(@Element WindowedValue element, OutputReceiver r) { + r.outputWindowedValue( + element.getValue(), element.getTimestamp(), element.getWindows(), element.getPane()); + } + } + } + private static Coder getDefaultCreateCoder( CoderRegistry coderRegistry, SchemaRegistry schemaRegistry, Iterable elems) throws CannotProvideCoderException { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 2993d8cca97a..c22b726c99a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -24,6 +24,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.Collection; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; @@ -185,6 +186,31 @@ public abstract class WindowedContext { */ public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + /** + * Adds the given element to the main output {@code PCollection}, with the given windowing + * metadata. + * + *

Once passed to {@code outputWindowedValue} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}), the timestamp must not be older than the input + * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in + * the same windows as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element except for the + * timestamp. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); + /** * Adds the given element to the output {@code PCollection} with the given tag. * @@ -231,6 +257,32 @@ public abstract class WindowedContext { * @see ParDo.SingleOutput#withOutputTags */ public abstract void outputWithTimestamp(TupleTag tag, T output, Instant timestamp); + + /** + * Adds the given element to the main output {@code PCollection}, with the given windowing + * metadata. + * + *

Once passed to {@code outputWindowedValue} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}), the timestamp must not be older than the input + * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in + * the same windows as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element except for the + * timestamp. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -342,6 +394,12 @@ public interface OutputReceiver { void output(T output); void outputWithTimestamp(T output, Instant timestamp); + + void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); } /** Receives tagged output for a multi-output function. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index 4f3719f2cc0e..7adfd7768d34 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -20,12 +20,15 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.checkerframework.checker.nullness.qual.Nullable; @@ -58,6 +61,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { outputReceiver.outputWithTimestamp(schemaCoder.getFromRowFunction().apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputReceiver.outputWindowedValue( + schemaCoder.getFromRowFunction().apply(output), timestamp, windows, paneInfo); + } } private static class WindowedContextOutputReceiver implements OutputReceiver { @@ -87,6 +100,20 @@ public void outputWithTimestamp(T output, Instant timestamp) { ((DoFn.WindowedContext) context).outputWithTimestamp(output, timestamp); } } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + if (outputTag != null) { + context.outputWindowedValue(outputTag, output, timestamp, windows, paneInfo); + } else { + ((DoFn.WindowedContext) context) + .outputWindowedValue(output, timestamp, windows, paneInfo); + } + } } private static class WindowedContextMultiOutputReceiver implements MultiOutputReceiver { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index f72de2af35ef..c0915c2dcd75 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -581,6 +582,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { outputWithTimestamp(tag, output, element.getTimestamp()); @@ -591,6 +601,18 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + for (BoundedWindow w : windows) { + getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo)); + } + } } /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java index 28b92b390297..cadb2b33bb2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindow.java @@ -54,6 +54,11 @@ public int hashCode() { return GlobalWindow.class.hashCode(); } + @Override + public String toString() { + return "GlobalWindow"; + } + private GlobalWindow() {} /** {@link Coder} for encoding and decoding {@code GlobalWindow}s. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 689e5af7055b..85c8d0d04ede 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.stream.Collectors; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; @@ -57,7 +58,11 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create.Values.CreateSource; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -240,6 +245,22 @@ public void processElement(ProcessContext c) { } } + private static class FormatMetadata extends DoFn { + @ProcessElement + public void processElement( + @Element String e, + @Timestamp Instant timestamp, + BoundedWindow w, + PaneInfo p, + OutputReceiver o) { + o.output(formatMetadata(e, timestamp, w, p)); + } + } + + private static String formatMetadata(String s, Instant timestamp, BoundedWindow w, PaneInfo p) { + return s + ":" + timestamp.getMillis() + ":" + w + ":" + p; + } + @Test @Category(NeedsRunner.class) public void testCreateTimestamped() { @@ -321,6 +342,66 @@ public void testCreateTimestampedDefaultOutputCoderUsingTypeDescriptor() throws assertThat(p.apply(values).getCoder(), equalTo(coder)); } + @Test + @Category(NeedsRunner.class) + public void testCreateWindowedValues() { + List> data = + Arrays.asList( + WindowedValue.of("a", new Instant(1L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + WindowedValue.of("b", new Instant(2L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + WindowedValue.of( + "c", new Instant(3L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + // The easiest way to directly check the created PCollection with PAssert and without relying on + // other + // mechanisms than built-in DoFn processing is to dump it all to a string. + List formattedData = + data.stream() + .flatMap( + (WindowedValue windowedValue) -> + windowedValue.getWindows().stream() + .map( + (BoundedWindow w) -> + formatMetadata( + windowedValue.getValue(), + windowedValue.getTimestamp(), + w, + windowedValue.getPane()))) + .collect(Collectors.toList()); + + PCollection output = + p.apply(Create.windowedValues(data).withWindowCoder(GlobalWindow.Coder.INSTANCE)) + .apply(ParDo.of(new FormatMetadata())); + + PAssert.that(output).containsInAnyOrder(formattedData); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testCreateWindowedValuesEmpty() { + PCollection output = + p.apply( + Create.windowedValues(new ArrayList>()) + .withCoder(StringUtf8Coder.of())); + + PAssert.that(output).empty(); + p.run(); + } + + @Test + public void testCreateWindowedValuesEmptyUnspecifiedCoder() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("determine a default Coder"); + thrown.expectMessage("Create.empty(Coder)"); + thrown.expectMessage("Create.empty(TypeDescriptor)"); + thrown.expectMessage("withCoder(Coder)"); + thrown.expectMessage("withType(TypeDescriptor)"); + p.apply(Create.windowedValues(new ArrayList<>())); + } + @Test @Category(ValidatesRunner.class) public void testCreateWithVoidType() throws Exception { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index a99d18a3d91f..bcd1f9ccf7b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -41,6 +41,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; @@ -75,6 +76,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; import org.joda.time.Instant; import org.junit.Before; @@ -556,6 +558,15 @@ public void output(SomeRestriction output) { public void outputWithTimestamp(SomeRestriction output, Instant timestamp) { fail("Unexpected output with timestamp"); } + + @Override + public void outputWindowedValue( + SomeRestriction output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + fail("Unexpected outputWindowedValue"); + } }; } }); @@ -800,6 +811,17 @@ public void outputWithTimestamp(String output, Instant instant) { invoked = true; assertEquals("foo", output); } + + @Override + public void outputWindowedValue( + String output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + assertFalse(invoked); + invoked = true; + assertEquals("foo", output); + } }; } }); diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java index d60ebe46b370..ad856c1c3a7d 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java @@ -26,6 +26,7 @@ import com.google.zetasql.Value; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; @@ -361,6 +363,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { c.output(tag, output, timestamp, w); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException( + "outputWindowedValue not supported in finish bundle here"); + } } private static RuntimeException extractException(Throwable e) { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 19c13775684e..69114703a528 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2214,15 +2214,28 @@ public void output(TupleTag tag, T output) { @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - // TODO: Check that timestamp is valid once all runners can provide proper timestamps. + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. outputTo( mainOutputConsumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - // TODO: Check that timestamp is valid once all runners can provide proper timestamps. + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) { @@ -2232,6 +2245,23 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // TODO(https://github.com/apache/beam/issues/29637): Check that timestamp is valid once all + // runners can provide proper timestamps. + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public State state(String stateId, boolean alwaysFetched) { StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId); @@ -2376,6 +2406,46 @@ public Instant timestamp(DoFn doFn) { currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn doFn) { + return timestamp; + } + + @Override + public RestrictionTracker restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue) + WindowedValue.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions @@ -2384,6 +2454,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + @Override public State state(String stateId, boolean alwaysFetched) { throw new UnsupportedOperationException( @@ -2491,6 +2574,46 @@ public Instant timestamp(DoFn doFn) { currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider( + this, errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn doFn) { + return timestamp; + } + + @Override + public RestrictionTracker restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue) + WindowedValue.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions @@ -2498,6 +2621,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp throw new UnsupportedOperationException( String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } } /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2534,6 +2670,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { output, timestamp, currentElement.getWindows(), currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp); @@ -2547,6 +2693,22 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValue.of( output, timestamp, currentElement.getWindows(), currentElement.getPane())); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } } /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2676,6 +2838,16 @@ public void outputWithTimestamp(Row output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp( fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -2711,6 +2883,16 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + tag, output, timestamp, windows, paneInfo); + } }; } @@ -2746,6 +2928,16 @@ public void outputWithTimestamp(Row output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp( tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; } @@ -2860,6 +3052,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkOnWindowExpirationTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void output(TupleTag tag, T output) { FnDataReceiver> consumer = @@ -2885,6 +3087,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkOnWindowExpirationTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2956,6 +3171,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -2988,6 +3213,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { context.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -3019,6 +3253,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; } @@ -3104,6 +3348,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimerTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); @@ -3130,6 +3384,14 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) {} + @Override public TimeDomain timeDomain() { return currentTimeDomain; @@ -3216,6 +3478,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -3248,6 +3520,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { context.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -3279,6 +3560,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index 8afcbd36d732..8f24ebc8ad9d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.time.Instant; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,7 +68,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -1092,6 +1095,15 @@ public void outputWithTimestamp( BigQueryStorageApiInsertError output, org.joda.time.Instant timestamp) { context.output(failedRowsTag, output, timestamp, GlobalWindow.INSTANCE); } + + @Override + public void outputWindowedValue( + BigQueryStorageApiInsertError output, + org.joda.time.Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException("outputWindowedValue not supported"); + } }; @Nullable OutputReceiver successfulRowsReceiver = null; if (successfulRowsTag != null) { @@ -1106,6 +1118,15 @@ public void output(TableRow output) { public void outputWithTimestamp(TableRow output, org.joda.time.Instant timestamp) { context.output(successfulRowsTag, output, timestamp, GlobalWindow.INSTANCE); } + + @Override + public void outputWindowedValue( + TableRow output, + org.joda.time.Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException("outputWindowedValue not supported"); + } }; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 6db79ab69b47..3128de45fde3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -102,9 +103,11 @@ import org.apache.beam.sdk.transforms.Wait; import org.apache.beam.sdk.transforms.WithTimestamps; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.BackOff; import org.apache.beam.sdk.util.FluentBackoff; @@ -2002,6 +2005,15 @@ public void output(Iterable output) { public void outputWithTimestamp(Iterable output, Instant timestamp) { c.output(output, timestamp, GlobalWindow.INSTANCE); } + + @Override + public void outputWindowedValue( + Iterable output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException("outputWindowedValue not supported"); + } } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 48b5b060a295..845d974af0b4 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -50,6 +50,8 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -340,6 +342,15 @@ public void outputWithTimestamp( records.add(output); } + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException("Not expecting outputWindowedValue"); + } + public List getOutputs() { return this.records; } diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java index 273a1915d2bb..b72fe423efb1 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java @@ -21,11 +21,14 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.internal.DefaultImplementation; @@ -173,6 +176,16 @@ public void outputWithTimestamp( records.add(output); } + @Override + public void outputWindowedValue( + PulsarMessage output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException( + "unsupported outputWindowedValue in mock outputreceiver"); + } + public List getOutputs() { return records; } diff --git a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java index 33827164c6b7..bb0e6524241d 100644 --- a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java +++ b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java @@ -22,12 +22,15 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.UnknownKeyFor; @@ -64,6 +67,16 @@ public void outputWithTimestamp( records.add(output); } + @Override + public void outputWindowedValue( + String output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException( + "Not expecting to receive call to outputWindowedValue"); + } + public List getOutputs() { return this.records; }