From 1a50fdc19122ed851f39b27262b76e2f1d310e6d Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 14 Mar 2024 12:33:44 -0700 Subject: [PATCH] Add translator for Flink DataStreams. (#30486) This shares what code is reasonable with the DataSet translation paths. Timestamp information is preserved, but windowing is not due to the differences in how these are expressed and propagated. --- ...nkStreamingPortablePipelineTranslator.java | 22 +- .../flink/adapter/BeamAdapterUtils.java | 24 +- .../adapter/BeamFlinkDataSetAdapter.java | 47 ++- .../adapter/BeamFlinkDataStreamAdapter.java | 304 ++++++++++++++++++ .../runners/flink/adapter/FlinkInput.java | 7 +- .../BeamFlinkDataStreamAdapterTest.java | 221 +++++++++++++ 6 files changed, 586 insertions(+), 39 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index c825806e7542..836c825300db 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -145,6 +145,17 @@ public StreamingTranslationContext createTranslationContext( StreamExecutionEnvironment executionEnvironment = FlinkExecutionEnvironments.createStreamExecutionEnvironment( pipelineOptions, filesToStage, confDir); + return createTranslationContext(jobInfo, pipelineOptions, executionEnvironment); + } + + /** + * Creates a streaming translation context. The resulting Flink execution dag will live in the + * given {@link StreamExecutionEnvironment}. + */ + public StreamingTranslationContext createTranslationContext( + JobInfo jobInfo, + FlinkPipelineOptions pipelineOptions, + StreamExecutionEnvironment executionEnvironment) { return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment); } @@ -204,7 +215,7 @@ public DataStream getDataStreamOrThrow(String pCollectionId) { } } - interface PTransformTranslator { + public interface PTransformTranslator { void translate(String id, RunnerApi.Pipeline pipeline, T t); } @@ -216,7 +227,12 @@ interface PTransformTranslator { private final Map> urnToTransformTranslator; - FlinkStreamingPortablePipelineTranslator() { + public FlinkStreamingPortablePipelineTranslator() { + this(ImmutableMap.of()); + } + + public FlinkStreamingPortablePipelineTranslator( + Map> extraTranslations) { ImmutableMap.Builder> translatorMap = ImmutableMap.builder(); translatorMap.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten); @@ -234,6 +250,8 @@ interface PTransformTranslator { // For testing only translatorMap.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, this::translateTestStream); + translatorMap.putAll(extraTranslations); + this.urnToTransformTranslator = translatorMap.build(); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java index 46e91c7598a0..982dabe2dd78 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.construction.Environments; import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; @@ -35,34 +36,37 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.ExecutionEnvironment; class BeamAdapterUtils { private BeamAdapterUtils() {} - interface PipelineFragmentTranslator { + interface PipelineFragmentTranslator { Map translate( Map inputs, RunnerApi.Pipeline pipelineProto, - ExecutionEnvironment executionEnvironment); + ExecutionEnvironmentT executionEnvironment); } @SuppressWarnings({"rawtypes"}) - static + static < + DataSetOrStreamT, + ExecutionEnvironmentT, + BeamInputT extends PInput, + BeamOutputT extends POutput> Map applyBeamPTransformInternal( Map inputs, BiFunction>, BeamInputT> toBeamInput, Function>> fromBeamOutput, PTransform transform, - ExecutionEnvironment executionEnvironment, + ExecutionEnvironmentT executionEnvironment, + boolean isBounded, Function> getTypeInformation, PipelineOptions pipelineOptions, CoderRegistry coderRegistry, - PipelineFragmentTranslator translator) { + PipelineFragmentTranslator translator) { Pipeline pipeline = Pipeline.create(); // Construct beam inputs corresponding to each Flink input. @@ -76,8 +80,10 @@ Map applyBeamPTransformInternal( new FlinkInput<>( key, BeamAdapterCoderUtils.typeInformationToCoder( - getTypeInformation.apply(Preconditions.checkNotNull(flinkInput)), - coderRegistry))))); + getTypeInformation.apply( + Preconditions.checkArgumentNotNull(flinkInput)), + coderRegistry), + isBounded)))); // Actually apply the transform to create Beam outputs. Map> beamOutputs = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java index f3e20f3d9a94..d7840067e7c4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java @@ -94,7 +94,7 @@ public DataSet applyBeamPTransform( return (DataSet) getNonNull( applyBeamPTransformInternal( - ImmutableMap.>of(), + ImmutableMap.of(), (pipeline, map) -> PBegin.in(pipeline), (output) -> ImmutableMap.of("output", output), transform, @@ -172,39 +172,34 @@ Map> applyBeamPTransformInternal( Function>> fromBeamOutput, PTransform transform, ExecutionEnvironment executionEnvironment) { - return BeamAdapterUtils., BeamInputT, BeamOutputT>applyBeamPTransformInternal( + return BeamAdapterUtils.applyBeamPTransformInternal( inputs, toBeamInput, fromBeamOutput, transform, executionEnvironment, + true, dataSet -> dataSet.getType(), pipelineOptions, coderRegistry, - new BeamAdapterUtils.PipelineFragmentTranslator>() { - @Override - public Map> translate( - Map> inputs, - RunnerApi.Pipeline pipelineProto, - ExecutionEnvironment executionEnvironment) { - Map> outputs = new HashMap<>(); - FlinkBatchPortablePipelineTranslator translator = - FlinkBatchPortablePipelineTranslator.createTranslator( - ImmutableMap.of( - FlinkInput.URN, flinkInputTranslator(inputs), - FlinkOutput.URN, flinkOutputTranslator(outputs))); - FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = - FlinkBatchPortablePipelineTranslator.createTranslationContext( - JobInfo.create( - "unusedJobId", - "unusedJobName", - "unusedRetrievalToken", - PipelineOptionsTranslation.toProto(pipelineOptions)), - pipelineOptions.as(FlinkPipelineOptions.class), - executionEnvironment); - translator.translate(context, translator.prepareForTranslation(pipelineProto)); - return outputs; - } + (flinkInputs, pipelineProto, env) -> { + Map> flinkOutputs = new HashMap<>(); + FlinkBatchPortablePipelineTranslator translator = + FlinkBatchPortablePipelineTranslator.createTranslator( + ImmutableMap.of( + FlinkInput.URN, flinkInputTranslator(flinkInputs), + FlinkOutput.URN, flinkOutputTranslator(flinkOutputs))); + FlinkBatchPortablePipelineTranslator.BatchTranslationContext context = + FlinkBatchPortablePipelineTranslator.createTranslationContext( + JobInfo.create( + "unusedJobId", + "unusedJobName", + "unusedRetrievalToken", + PipelineOptionsTranslation.toProto(pipelineOptions)), + pipelineOptions.as(FlinkPipelineOptions.class), + env); + translator.translate(context, translator.prepareForTranslation(pipelineProto)); + return flinkOutputs; }); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java new file mode 100644 index 000000000000..d21bf89d40b2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.adapter; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator; +import org.apache.beam.runners.fnexecution.provisioning.JobInfo; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * An adapter class that allows one to apply Apache Beam PTransforms directly to Flink DataStreams. + */ +public class BeamFlinkDataStreamAdapter { + private final PipelineOptions pipelineOptions; + private final CoderRegistry coderRegistry = CoderRegistry.createDefault(); + + public BeamFlinkDataStreamAdapter() { + this(PipelineOptionsFactory.create()); + } + + public BeamFlinkDataStreamAdapter(PipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public > + DataStream applyBeamPTransform( + DataStream input, PTransform> transform) { + return (DataStream) + getNonNull( + applyBeamPTransformInternal( + ImmutableMap.of("input", input), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), + (output) -> ImmutableMap.of("output", output), + transform, + input.getExecutionEnvironment()), + "output"); + } + + public DataStream applyBeamPTransform( + Map> inputs, + PTransform> transform) { + return (DataStream) + getNonNull( + applyBeamPTransformInternal( + inputs, + BeamAdapterUtils::mapToTuple, + (output) -> ImmutableMap.of("output", output), + transform, + inputs.values().stream().findAny().get().getExecutionEnvironment()), + "output"); + } + + public DataStream applyBeamPTransform( + StreamExecutionEnvironment executionEnvironment, + PTransform> transform) { + return (DataStream) + getNonNull( + applyBeamPTransformInternal( + ImmutableMap.of(), + (pipeline, map) -> PBegin.in(pipeline), + (output) -> ImmutableMap.of("output", output), + transform, + executionEnvironment), + "output"); + } + + public > + Map> applyMultiOutputBeamPTransform( + DataStream input, PTransform transform) { + return applyBeamPTransformInternal( + ImmutableMap.of("input", input), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), + BeamAdapterUtils::tupleToMap, + transform, + input.getExecutionEnvironment()); + } + + public Map> applyMultiOutputBeamPTransform( + Map> inputs, + PTransform transform) { + return applyBeamPTransformInternal( + inputs, + BeamAdapterUtils::mapToTuple, + BeamAdapterUtils::tupleToMap, + transform, + inputs.values().stream().findAny().get().getExecutionEnvironment()); + } + + public Map> applyMultiOutputBeamPTransform( + StreamExecutionEnvironment executionEnvironment, + PTransform transform) { + return applyBeamPTransformInternal( + ImmutableMap.of(), + (pipeline, map) -> PBegin.in(pipeline), + BeamAdapterUtils::tupleToMap, + transform, + executionEnvironment); + } + + public > + void applyNoOutputBeamPTransform( + DataStream input, PTransform transform) { + applyBeamPTransformInternal( + ImmutableMap.of("input", input), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), + pDone -> ImmutableMap.of(), + transform, + input.getExecutionEnvironment()); + } + + public void applyNoOutputBeamPTransform( + Map> inputs, PTransform transform) { + applyBeamPTransformInternal( + inputs, + BeamAdapterUtils::mapToTuple, + pDone -> ImmutableMap.of(), + transform, + inputs.values().stream().findAny().get().getExecutionEnvironment()); + } + + public void applyNoOutputBeamPTransform( + StreamExecutionEnvironment executionEnvironment, PTransform transform) { + applyBeamPTransformInternal( + ImmutableMap.of(), + (pipeline, map) -> PBegin.in(pipeline), + pDone -> ImmutableMap.of(), + transform, + executionEnvironment); + } + + private + Map> applyBeamPTransformInternal( + Map> inputs, + BiFunction>, BeamInputT> toBeamInput, + Function>> fromBeamOutput, + PTransform transform, + StreamExecutionEnvironment executionEnvironment) { + return BeamAdapterUtils.applyBeamPTransformInternal( + inputs, + toBeamInput, + fromBeamOutput, + transform, + executionEnvironment, + false, + dataStream -> dataStream.getType(), + pipelineOptions, + coderRegistry, + (flinkInputs, pipelineProto, env) -> { + Map> flinkOutputs = new HashMap<>(); + FlinkStreamingPortablePipelineTranslator translator = + new FlinkStreamingPortablePipelineTranslator( + ImmutableMap.of( + FlinkInput.URN, flinkInputTranslator(flinkInputs), + FlinkOutput.URN, flinkOutputTranslator(flinkOutputs))); + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context = + translator.createTranslationContext( + JobInfo.create( + "unusedJobId", + "unusedJobName", + "unusedRetrievalToken", + PipelineOptionsTranslation.toProto(pipelineOptions)), + pipelineOptions.as(FlinkPipelineOptions.class), + env); + translator.translate(context, translator.prepareForTranslation(pipelineProto)); + return flinkOutputs; + }); + } + + private + FlinkStreamingPortablePipelineTranslator.PTransformTranslator< + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext> + flinkInputTranslator(Map> inputMap) { + return (String id, + RunnerApi.Pipeline p, + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) -> { + // When we run into a FlinkInput operator, it "produces" the corresponding input as its + // "computed result." + RunnerApi.PTransform transform = p.getComponents().getTransformsOrThrow(id); + String inputId = transform.getSpec().getPayload().toStringUtf8(); + DataStream flinkInput = + Preconditions.checkStateNotNull( + (DataStream) inputMap.get(inputId), + "missing input referenced in proto: " + inputId); + context.addDataStream( + Iterables.getOnlyElement(transform.getOutputsMap().values()), + flinkInput.process( + new ProcessFunction>() { + @Override + public void processElement( + InputT value, + ProcessFunction>.Context ctx, + Collector> out) + throws Exception { + out.collect( + WindowedValue.timestampedValueInGlobalWindow( + value, + ctx.timestamp() == null + ? BoundedWindow.TIMESTAMP_MIN_VALUE + : Instant.ofEpochMilli(ctx.timestamp()))); + } + }, + BeamAdapterCoderUtils.coderToTypeInformation( + WindowedValue.getFullCoder( + BeamAdapterCoderUtils.typeInformationToCoder( + flinkInput.getType(), coderRegistry), + GlobalWindow.Coder.INSTANCE), + pipelineOptions))); + }; + } + + private + FlinkStreamingPortablePipelineTranslator.PTransformTranslator< + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext> + flinkOutputTranslator(Map> outputMap) { + return (String id, + RunnerApi.Pipeline p, + FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context) -> { + // When we run into a FlinkOutput operator, we cache the computed PCollection to return to the + // user. + RunnerApi.PTransform transform = p.getComponents().getTransformsOrThrow(id); + DataStream> inputDataStream = + context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values())); + String outputId = transform.getSpec().getPayload().toStringUtf8(); + Coder outputCoder = + BeamAdapterCoderUtils.lookupCoder( + p, Iterables.getOnlyElement(transform.getInputsMap().values())); + // TODO(robertwb): Also handle or disable length prefix coding (for embedded mode at least). + outputMap.put( + outputId, + inputDataStream.transform( + "StripWindows", + BeamAdapterCoderUtils.coderToTypeInformation(outputCoder, pipelineOptions), + new UnwrapWindowOperator())); + }; + } + + /** + * Forwards the Beam timestamps to the underlying Flink timestamps, but unlike {@link + * DataStream#assignTimestampsAndWatermarks(WatermarkStrategy)} does not discard the underlying + * watermark signals. + * + * @param user element type + */ + private static class UnwrapWindowOperator extends AbstractStreamOperator + implements OneInputStreamOperator, T> { + @Override + public void processElement(StreamRecord> element) { + output.collect( + element.replace( + element.getValue().getValue(), element.getValue().getTimestamp().getMillis())); + } + } + + private V getNonNull(Map map, K key) { + return Preconditions.checkStateNotNull(map.get(Preconditions.checkArgumentNotNull(key))); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java index 7ee261ca4b5f..20ff455f09c7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java @@ -46,9 +46,12 @@ private final Coder coder; - FlinkInput(String identifier, Coder coder) { + private final boolean isBounded; + + FlinkInput(String identifier, Coder coder, boolean isBounded) { this.identifier = identifier; this.coder = coder; + this.isBounded = isBounded; } @Override @@ -56,7 +59,7 @@ public PCollection expand(PBegin input) { return PCollection.createPrimitiveOutputInternal( input.getPipeline(), WindowingStrategy.globalDefault(), - PCollection.IsBounded.BOUNDED, + isBounded ? PCollection.IsBounded.BOUNDED : PCollection.IsBounded.UNBOUNDED, coder); } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest.java new file mode 100644 index 000000000000..719b28523487 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapterTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.adapter; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +import java.util.Map; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; + +public class BeamFlinkDataStreamAdapterTest { + + private static PTransform, PCollection> withPrefix( + String prefix) { + return ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(@Element String word, OutputReceiver out) { + out.output(prefix + word); + } + }); + } + + @Test + public void testApplySimpleTransform() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input = env.fromCollection(ImmutableList.of("a", "b", "c")); + DataStream result = + new BeamFlinkDataStreamAdapter().applyBeamPTransform(input, withPrefix("x")); + + assertThat( + ImmutableList.copyOf(result.executeAndCollect()), containsInAnyOrder("xa", "xb", "xc")); + } + + @Test + public void testApplyCompositeTransform() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input = env.fromCollection(ImmutableList.of("a", "b", "c")); + DataStream result = + new BeamFlinkDataStreamAdapter() + .applyBeamPTransform( + input, + new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply(withPrefix("x")).apply(withPrefix("y")); + } + }); + + assertThat( + ImmutableList.copyOf(result.executeAndCollect()), containsInAnyOrder("yxa", "yxb", "yxc")); + } + + @Test + public void testApplyMultiInputTransform() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input1 = env.fromCollection(ImmutableList.of("a", "b", "c")); + DataStream input2 = env.fromCollection(ImmutableList.of("d", "e", "f")); + DataStream result = + new BeamFlinkDataStreamAdapter() + .applyBeamPTransform( + ImmutableMap.of("x", input1, "y", input2), + new PTransform>() { + @Override + public PCollection expand(PCollectionTuple input) { + return PCollectionList.of(input.get("x").apply(withPrefix("x"))) + .and(input.get("y").apply(withPrefix("y"))) + .apply(Flatten.pCollections()); + } + }); + + assertThat( + ImmutableList.copyOf(result.executeAndCollect()), + containsInAnyOrder("xa", "xb", "xc", "yd", "ye", "yf")); + } + + @Test + public void testApplyMultiOutputTransform() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input = env.fromCollection(ImmutableList.of("a", "b", "c")); + Map> result = + new BeamFlinkDataStreamAdapter() + .applyMultiOutputBeamPTransform( + input, + new PTransform, PCollectionTuple>() { + @Override + public PCollectionTuple expand(PCollection input) { + return PCollectionTuple.of("x", input.apply(withPrefix("x"))) + .and("y", input.apply(withPrefix("y"))); + } + }); + + assertThat( + ImmutableList.copyOf(result.get("x").executeAndCollect()), + containsInAnyOrder("xa", "xb", "xc")); + assertThat( + ImmutableList.copyOf(result.get("y").executeAndCollect()), + containsInAnyOrder("ya", "yb", "yc")); + } + + @Test + public void testApplyGroupingTransform() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + + DataStream input = env.fromCollection(ImmutableList.of("a", "a", "b")); + DataStream> result = + new BeamFlinkDataStreamAdapter() + .applyBeamPTransform( + input, + new PTransform, PCollection>>() { + @Override + public PCollection> expand(PCollection input) { + return input + .apply(Window.into(FixedWindows.of(Duration.millis(10)))) + .apply(Count.perElement()); + } + }); + + assertThat( + ImmutableList.copyOf(result.executeAndCollect()), + containsInAnyOrder(KV.of("a", 2L), KV.of("b", 1L))); + } + + @Test + public void testApplyPreservesInputTimestamps() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream input = + env.fromCollection(ImmutableList.of(1L, 2L, 12L)) + .assignTimestampsAndWatermarks( + WatermarkStrategy.forBoundedOutOfOrderness(java.time.Duration.ofMillis(100)) + .withTimestampAssigner( + (SerializableTimestampAssigner) + (element, recordTimestamp) -> element)); + DataStream result = + new BeamFlinkDataStreamAdapter() + .applyBeamPTransform( + input, + new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input + .apply(Window.into(FixedWindows.of(Duration.millis(10)))) + .apply(Sum.longsGlobally().withoutDefaults()); + } + }); + + assertThat(ImmutableList.copyOf(result.executeAndCollect()), containsInAnyOrder(3L, 12L)); + } + + @Test + public void testApplyPreservesOutputTimestamps() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + DataStream input = env.fromCollection(ImmutableList.of(1L, 2L, 12L)); + DataStream withTimestamps = + new BeamFlinkDataStreamAdapter() + .applyBeamPTransform( + input, + new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply(WithTimestamps.of(x -> Instant.ofEpochMilli(x))); + } + }); + + assertThat( + ImmutableList.copyOf( + withTimestamps + .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(10))) + .reduce((ReduceFunction) (a, b) -> a + b) + .executeAndCollect()), + containsInAnyOrder(3L, 12L)); + } +}