Skip to content

Commit

Permalink
Merge pull request apache#30488 Reduce nullness checks in flink adapt…
Browse files Browse the repository at this point in the history
…ers.
  • Loading branch information
robertwb authored Mar 7, 2024
2 parents 363731e + 77f9e3a commit 45d5fea
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -50,7 +51,7 @@ Map<String, DataSetOrStreamT> translate(
ExecutionEnvironment executionEnvironment);
}

@SuppressWarnings({"nullness", "rawtypes"})
@SuppressWarnings({"rawtypes"})
static <DataSetOrStreamT, BeamInputT extends PInput, BeamOutputT extends POutput>
Map<String, DataSetOrStreamT> applyBeamPTransformInternal(
Map<String, ? extends DataSetOrStreamT> inputs,
Expand All @@ -75,7 +76,8 @@ Map<String, DataSetOrStreamT> applyBeamPTransformInternal(
new FlinkInput<>(
key,
BeamAdapterCoderUtils.typeInformationToCoder(
getTypeInformation.apply(flinkInput), coderRegistry)))));
getTypeInformation.apply(Preconditions.checkNotNull(flinkInput)),
coderRegistry)))));

// Actually apply the transform to create Beam outputs.
Map<String, PCollection<?>> beamOutputs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
Expand Down Expand Up @@ -59,55 +60,54 @@ public BeamFlinkDataSetAdapter(PipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
}

@SuppressWarnings("nullness")
public <InputT, OutputT, CollectionT extends PCollection<? extends InputT>>
DataSet<OutputT> applyBeamPTransform(
DataSet<InputT> input, PTransform<CollectionT, PCollection<OutputT>> transform) {
return (DataSet)
this.<CollectionT, PCollection<OutputT>>applyBeamPTransformInternal(
getNonNull(
applyBeamPTransformInternal(
ImmutableMap.of("input", input),
(pipeline, map) -> (CollectionT) map.get("input"),
(pipeline, map) -> (CollectionT) getNonNull(map, "input"),
(output) -> ImmutableMap.of("output", output),
transform,
input.getExecutionEnvironment())
.get("output");
input.getExecutionEnvironment()),
"output");
}

@SuppressWarnings("nullness")
public <OutputT> DataSet<OutputT> applyBeamPTransform(
Map<String, ? extends DataSet<?>> inputs,
PTransform<PCollectionTuple, PCollection<OutputT>> transform) {
return (DataSet)
applyBeamPTransformInternal(
getNonNull(
applyBeamPTransformInternal(
inputs,
BeamAdapterUtils::mapToTuple,
(output) -> ImmutableMap.of("output", output),
transform,
inputs.values().stream().findAny().get().getExecutionEnvironment())
.get("output");
inputs.values().stream().findAny().get().getExecutionEnvironment()),
"output");
}

@SuppressWarnings("nullness")
public <OutputT> DataSet<OutputT> applyBeamPTransform(
ExecutionEnvironment executionEnvironment,
PTransform<PBegin, PCollection<OutputT>> transform) {
return (DataSet)
applyBeamPTransformInternal(
getNonNull(
applyBeamPTransformInternal(
ImmutableMap.<String, DataSet<?>>of(),
(pipeline, map) -> PBegin.in(pipeline),
(output) -> ImmutableMap.of("output", output),
transform,
executionEnvironment)
.get("output");
executionEnvironment),
"output");
}

@SuppressWarnings("nullness")
public <InputT, CollectionT extends PCollection<? extends InputT>>
Map<String, DataSet<?>> applyMultiOutputBeamPTransform(
DataSet<InputT> input, PTransform<CollectionT, PCollectionTuple> transform) {
return applyBeamPTransformInternal(
ImmutableMap.of("input", input),
(pipeline, map) -> (CollectionT) map.get("input"),
(pipeline, map) -> (CollectionT) getNonNull(map, "input"),
BeamAdapterUtils::tupleToMap,
transform,
input.getExecutionEnvironment());
Expand All @@ -134,13 +134,12 @@ public Map<String, DataSet<?>> applyMultiOutputBeamPTransform(
executionEnvironment);
}

@SuppressWarnings("nullness")
public <InputT, CollectionT extends PCollection<? extends InputT>>
void applyNoOutputBeamPTransform(
DataSet<InputT> input, PTransform<CollectionT, PDone> transform) {
applyBeamPTransformInternal(
ImmutableMap.of("input", input),
(pipeline, map) -> (CollectionT) map.get("input"),
(pipeline, map) -> (CollectionT) getNonNull(map, "input"),
pDone -> ImmutableMap.of(),
transform,
input.getExecutionEnvironment());
Expand Down Expand Up @@ -217,11 +216,11 @@ private <InputT> FlinkBatchPortablePipelineTranslator.PTransformTranslator flink
// When we run into a FlinkInput operator, it "produces" the corresponding input as its
// "computed result."
String inputId = t.getTransform().getSpec().getPayload().toStringUtf8();
DataSet<InputT> flinkInput = (DataSet<InputT>) inputMap.get(inputId);
// To make the nullness checker happy...
if (flinkInput == null) {
throw new IllegalStateException("Missing input: " + inputId);
}
DataSet<InputT> flinkInput =
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
(DataSet<InputT>) inputMap.get(inputId),
"missing input referenced in proto: ",
inputId);
context.addDataSet(
Iterables.getOnlyElement(t.getTransform().getOutputsMap().values()),
// new MapOperator(...) rather than .map to manually designate the type information.
Expand Down Expand Up @@ -262,4 +261,8 @@ private <InputT> FlinkBatchPortablePipelineTranslator.PTransformTranslator flink
"StripWindows"));
};
}

private <K, V> V getNonNull(Map<K, V> map, K key) {
return Preconditions.checkStateNotNull(map.get(Preconditions.checkArgumentNotNull(key)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public PCollection<T> expand(PBegin input) {
}

// This Translator translates this kind of PTransform into a Beam proto representation.
@SuppressWarnings("nullness")
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Translator
implements PTransformTranslation.TransformPayloadTranslator<FlinkInput<?>>,
Expand All @@ -72,6 +71,7 @@ public String getUrn() {
}

@Override
@SuppressWarnings("nullness") // TODO(https://github.com/apache/beam/issues/20497)
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, FlinkInput<?>> application, SdkComponents components)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public PDone expand(PCollection<T> input) {
return PDone.in(input.getPipeline());
}

@SuppressWarnings("nullness")
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Translator
implements PTransformTranslation.TransformPayloadTranslator<FlinkOutput<?>>,
Expand All @@ -63,6 +62,7 @@ public String getUrn() {
}

@Override
@SuppressWarnings("nullness") // TODO(https://github.com/apache/beam/issues/20497)
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, FlinkOutput<?>> application, SdkComponents components)
throws IOException {
Expand Down

0 comments on commit 45d5fea

Please sign in to comment.