Skip to content

Commit

Permalink
Add translator for Flink DataStreams. (apache#30486)
Browse files Browse the repository at this point in the history
This shares what code is reasonable with the DataSet translation paths.

Timestamp information is preserved, but windowing is not due to the
differences in how these are expressed and propagated.
  • Loading branch information
robertwb authored and hjtran committed Apr 4, 2024
1 parent bd4bd7f commit 1a50fdc
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ public StreamingTranslationContext createTranslationContext(
StreamExecutionEnvironment executionEnvironment =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
pipelineOptions, filesToStage, confDir);
return createTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

/**
* Creates a streaming translation context. The resulting Flink execution dag will live in the
* given {@link StreamExecutionEnvironment}.
*/
public StreamingTranslationContext createTranslationContext(
JobInfo jobInfo,
FlinkPipelineOptions pipelineOptions,
StreamExecutionEnvironment executionEnvironment) {
return new StreamingTranslationContext(jobInfo, pipelineOptions, executionEnvironment);
}

Expand Down Expand Up @@ -204,7 +215,7 @@ public <T> DataStream<T> getDataStreamOrThrow(String pCollectionId) {
}
}

interface PTransformTranslator<T> {
public interface PTransformTranslator<T> {
void translate(String id, RunnerApi.Pipeline pipeline, T t);
}

Expand All @@ -216,7 +227,12 @@ interface PTransformTranslator<T> {
private final Map<String, PTransformTranslator<StreamingTranslationContext>>
urnToTransformTranslator;

FlinkStreamingPortablePipelineTranslator() {
public FlinkStreamingPortablePipelineTranslator() {
this(ImmutableMap.of());
}

public FlinkStreamingPortablePipelineTranslator(
Map<String, PTransformTranslator<StreamingTranslationContext>> extraTranslations) {
ImmutableMap.Builder<String, PTransformTranslator<StreamingTranslationContext>> translatorMap =
ImmutableMap.builder();
translatorMap.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, this::translateFlatten);
Expand All @@ -234,6 +250,8 @@ interface PTransformTranslator<T> {
// For testing only
translatorMap.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, this::translateTestStream);

translatorMap.putAll(extraTranslations);

this.urnToTransformTranslator = translatorMap.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
Expand All @@ -35,34 +36,37 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;

class BeamAdapterUtils {
private BeamAdapterUtils() {}

interface PipelineFragmentTranslator<DataSetOrStreamT> {
interface PipelineFragmentTranslator<DataSetOrStreamT, ExecutionEnvironmentT> {
Map<String, DataSetOrStreamT> translate(
Map<String, ? extends DataSetOrStreamT> inputs,
RunnerApi.Pipeline pipelineProto,
ExecutionEnvironment executionEnvironment);
ExecutionEnvironmentT executionEnvironment);
}

@SuppressWarnings({"rawtypes"})
static <DataSetOrStreamT, BeamInputT extends PInput, BeamOutputT extends POutput>
static <
DataSetOrStreamT,
ExecutionEnvironmentT,
BeamInputT extends PInput,
BeamOutputT extends POutput>
Map<String, DataSetOrStreamT> applyBeamPTransformInternal(
Map<String, ? extends DataSetOrStreamT> inputs,
BiFunction<Pipeline, Map<String, PCollection<?>>, BeamInputT> toBeamInput,
Function<BeamOutputT, Map<String, PCollection<?>>> fromBeamOutput,
PTransform<? super BeamInputT, BeamOutputT> transform,
ExecutionEnvironment executionEnvironment,
ExecutionEnvironmentT executionEnvironment,
boolean isBounded,
Function<DataSetOrStreamT, TypeInformation<?>> getTypeInformation,
PipelineOptions pipelineOptions,
CoderRegistry coderRegistry,
PipelineFragmentTranslator<DataSetOrStreamT> translator) {
PipelineFragmentTranslator<DataSetOrStreamT, ExecutionEnvironmentT> translator) {
Pipeline pipeline = Pipeline.create();

// Construct beam inputs corresponding to each Flink input.
Expand All @@ -76,8 +80,10 @@ Map<String, DataSetOrStreamT> applyBeamPTransformInternal(
new FlinkInput<>(
key,
BeamAdapterCoderUtils.typeInformationToCoder(
getTypeInformation.apply(Preconditions.checkNotNull(flinkInput)),
coderRegistry)))));
getTypeInformation.apply(
Preconditions.checkArgumentNotNull(flinkInput)),
coderRegistry),
isBounded))));

// Actually apply the transform to create Beam outputs.
Map<String, PCollection<?>> beamOutputs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public <OutputT> DataSet<OutputT> applyBeamPTransform(
return (DataSet)
getNonNull(
applyBeamPTransformInternal(
ImmutableMap.<String, DataSet<?>>of(),
ImmutableMap.of(),
(pipeline, map) -> PBegin.in(pipeline),
(output) -> ImmutableMap.of("output", output),
transform,
Expand Down Expand Up @@ -172,39 +172,34 @@ Map<String, DataSet<?>> applyBeamPTransformInternal(
Function<BeamOutputT, Map<String, PCollection<?>>> fromBeamOutput,
PTransform<? super BeamInputT, BeamOutputT> transform,
ExecutionEnvironment executionEnvironment) {
return BeamAdapterUtils.<DataSet<?>, BeamInputT, BeamOutputT>applyBeamPTransformInternal(
return BeamAdapterUtils.applyBeamPTransformInternal(
inputs,
toBeamInput,
fromBeamOutput,
transform,
executionEnvironment,
true,
dataSet -> dataSet.getType(),
pipelineOptions,
coderRegistry,
new BeamAdapterUtils.PipelineFragmentTranslator<DataSet<?>>() {
@Override
public Map<String, DataSet<?>> translate(
Map<String, ? extends DataSet<?>> inputs,
RunnerApi.Pipeline pipelineProto,
ExecutionEnvironment executionEnvironment) {
Map<String, DataSet<?>> outputs = new HashMap<>();
FlinkBatchPortablePipelineTranslator translator =
FlinkBatchPortablePipelineTranslator.createTranslator(
ImmutableMap.of(
FlinkInput.URN, flinkInputTranslator(inputs),
FlinkOutput.URN, flinkOutputTranslator(outputs)));
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
FlinkBatchPortablePipelineTranslator.createTranslationContext(
JobInfo.create(
"unusedJobId",
"unusedJobName",
"unusedRetrievalToken",
PipelineOptionsTranslation.toProto(pipelineOptions)),
pipelineOptions.as(FlinkPipelineOptions.class),
executionEnvironment);
translator.translate(context, translator.prepareForTranslation(pipelineProto));
return outputs;
}
(flinkInputs, pipelineProto, env) -> {
Map<String, DataSet<?>> flinkOutputs = new HashMap<>();
FlinkBatchPortablePipelineTranslator translator =
FlinkBatchPortablePipelineTranslator.createTranslator(
ImmutableMap.of(
FlinkInput.URN, flinkInputTranslator(flinkInputs),
FlinkOutput.URN, flinkOutputTranslator(flinkOutputs)));
FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
FlinkBatchPortablePipelineTranslator.createTranslationContext(
JobInfo.create(
"unusedJobId",
"unusedJobName",
"unusedRetrievalToken",
PipelineOptionsTranslation.toProto(pipelineOptions)),
pipelineOptions.as(FlinkPipelineOptions.class),
env);
translator.translate(context, translator.prepareForTranslation(pipelineProto));
return flinkOutputs;
});
}

Expand Down
Loading

0 comments on commit 1a50fdc

Please sign in to comment.