diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java index 065796057c23..46e91c7598a0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamAdapterUtils.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -50,7 +51,7 @@ Map translate( ExecutionEnvironment executionEnvironment); } - @SuppressWarnings({"nullness", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) static Map applyBeamPTransformInternal( Map inputs, @@ -75,7 +76,8 @@ Map applyBeamPTransformInternal( new FlinkInput<>( key, BeamAdapterCoderUtils.typeInformationToCoder( - getTypeInformation.apply(flinkInput), coderRegistry))))); + getTypeInformation.apply(Preconditions.checkNotNull(flinkInput)), + coderRegistry))))); // Actually apply the transform to create Beam outputs. Map> beamOutputs = diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java index 93b3a8492a67..f3e20f3d9a94 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.sdk.util.construction.graph.PipelineNode; @@ -59,55 +60,54 @@ public BeamFlinkDataSetAdapter(PipelineOptions pipelineOptions) { this.pipelineOptions = pipelineOptions; } - @SuppressWarnings("nullness") public > DataSet applyBeamPTransform( DataSet input, PTransform> transform) { return (DataSet) - this.>applyBeamPTransformInternal( + getNonNull( + applyBeamPTransformInternal( ImmutableMap.of("input", input), - (pipeline, map) -> (CollectionT) map.get("input"), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), (output) -> ImmutableMap.of("output", output), transform, - input.getExecutionEnvironment()) - .get("output"); + input.getExecutionEnvironment()), + "output"); } - @SuppressWarnings("nullness") public DataSet applyBeamPTransform( Map> inputs, PTransform> transform) { return (DataSet) - applyBeamPTransformInternal( + getNonNull( + applyBeamPTransformInternal( inputs, BeamAdapterUtils::mapToTuple, (output) -> ImmutableMap.of("output", output), transform, - inputs.values().stream().findAny().get().getExecutionEnvironment()) - .get("output"); + inputs.values().stream().findAny().get().getExecutionEnvironment()), + "output"); } - @SuppressWarnings("nullness") public DataSet applyBeamPTransform( ExecutionEnvironment executionEnvironment, PTransform> transform) { return (DataSet) - applyBeamPTransformInternal( + getNonNull( + applyBeamPTransformInternal( ImmutableMap.>of(), (pipeline, map) -> PBegin.in(pipeline), (output) -> ImmutableMap.of("output", output), transform, - executionEnvironment) - .get("output"); + executionEnvironment), + "output"); } - @SuppressWarnings("nullness") public > Map> applyMultiOutputBeamPTransform( DataSet input, PTransform transform) { return applyBeamPTransformInternal( ImmutableMap.of("input", input), - (pipeline, map) -> (CollectionT) map.get("input"), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), BeamAdapterUtils::tupleToMap, transform, input.getExecutionEnvironment()); @@ -134,13 +134,12 @@ public Map> applyMultiOutputBeamPTransform( executionEnvironment); } - @SuppressWarnings("nullness") public > void applyNoOutputBeamPTransform( DataSet input, PTransform transform) { applyBeamPTransformInternal( ImmutableMap.of("input", input), - (pipeline, map) -> (CollectionT) map.get("input"), + (pipeline, map) -> (CollectionT) getNonNull(map, "input"), pDone -> ImmutableMap.of(), transform, input.getExecutionEnvironment()); @@ -217,11 +216,11 @@ private FlinkBatchPortablePipelineTranslator.PTransformTranslator flink // When we run into a FlinkInput operator, it "produces" the corresponding input as its // "computed result." String inputId = t.getTransform().getSpec().getPayload().toStringUtf8(); - DataSet flinkInput = (DataSet) inputMap.get(inputId); - // To make the nullness checker happy... - if (flinkInput == null) { - throw new IllegalStateException("Missing input: " + inputId); - } + DataSet flinkInput = + org.apache.beam.sdk.util.Preconditions.checkStateNotNull( + (DataSet) inputMap.get(inputId), + "missing input referenced in proto: ", + inputId); context.addDataSet( Iterables.getOnlyElement(t.getTransform().getOutputsMap().values()), // new MapOperator(...) rather than .map to manually designate the type information. @@ -262,4 +261,8 @@ private FlinkBatchPortablePipelineTranslator.PTransformTranslator flink "StripWindows")); }; } + + private V getNonNull(Map map, K key) { + return Preconditions.checkStateNotNull(map.get(Preconditions.checkArgumentNotNull(key))); + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java index 0b76319e9eac..7ee261ca4b5f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkInput.java @@ -61,7 +61,6 @@ public PCollection expand(PBegin input) { } // This Translator translates this kind of PTransform into a Beam proto representation. - @SuppressWarnings("nullness") @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Translator implements PTransformTranslation.TransformPayloadTranslator>, @@ -72,6 +71,7 @@ public String getUrn() { } @Override + @SuppressWarnings("nullness") // TODO(https://github.com/apache/beam/issues/20497) public RunnerApi.FunctionSpec translate( AppliedPTransform> application, SdkComponents components) throws IOException { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkOutput.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkOutput.java index 2cc25236c1c3..64577901765d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkOutput.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/FlinkOutput.java @@ -52,7 +52,6 @@ public PDone expand(PCollection input) { return PDone.in(input.getPipeline()); } - @SuppressWarnings("nullness") @AutoService(TransformPayloadTranslatorRegistrar.class) public static class Translator implements PTransformTranslation.TransformPayloadTranslator>, @@ -63,6 +62,7 @@ public String getUrn() { } @Override + @SuppressWarnings("nullness") // TODO(https://github.com/apache/beam/issues/20497) public RunnerApi.FunctionSpec translate( AppliedPTransform> application, SdkComponents components) throws IOException {