From a58f05dc270c022396434357c1d288142825f1e6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 23 Oct 2024 14:30:57 -0400 Subject: [PATCH] Eliminate nullness errors from a bit of the FlinkRunner --- .../flink/CreateStreamingFlinkView.java | 5 +-- .../flink/FlinkBatchPipelineTranslator.java | 38 ++++++++++--------- .../flink/FlinkBatchTranslationContext.java | 20 ++++++---- .../flink/FlinkExecutionEnvironments.java | 8 ++-- .../beam/runners/flink/FlinkJobInvoker.java | 14 ++++--- .../runners/flink/FlinkJobServerDriver.java | 5 +-- .../FlinkPipelineExecutionEnvironment.java | 26 +++++++------ .../runners/flink/FlinkPipelineOptions.java | 1 + .../runners/flink/FlinkPipelineRunner.java | 14 +++---- .../beam/runners/flink/FlinkRunner.java | 17 ++++++--- .../beam/runners/flink/FlinkRunnerResult.java | 13 +++---- .../FlinkStreamingTranslationContext.java | 36 ++++++++++-------- .../runners/jobsubmission/JobInvoker.java | 2 +- .../beam/runners/samza/SamzaJobInvoker.java | 3 +- .../beam/runners/spark/SparkJobInvoker.java | 3 +- .../ClasspathScanningResourcesDetector.java | 11 +++++- .../resources/PipelineResources.java | 3 +- .../resources/PipelineResourcesDetector.java | 3 +- 18 files changed, 122 insertions(+), 100 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java index 7861ec5371b9..2eb9bb5ca0a0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java @@ -40,9 +40,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; /** Flink streaming overrides for various view (side input) transforms. */ -@SuppressWarnings({ - "rawtypes" // TODO(https://github.com/apache/beam/issues/20447) -}) class CreateStreamingFlinkView extends PTransform, PCollection> { private final PCollectionView view; @@ -110,7 +107,7 @@ public static class Factory PCollection, PTransform, PCollection>> { - static final Factory INSTANCE = new Factory(); + static final Factory INSTANCE = new Factory<>(); private Factory() {} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java index b415c9b10559..c517c7324b95 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -30,10 +32,6 @@ import org.slf4j.LoggerFactory; /** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Flink batch job. */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class); @@ -48,7 +46,7 @@ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions op } @Override - @SuppressWarnings("rawtypes, unchecked") + @SuppressWarnings({"rawtypes", "unchecked"}) public void translate(Pipeline pipeline) { batchContext.init(pipeline); super.translate(pipeline); @@ -68,13 +66,22 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName()); this.depth++; - BatchTransformTranslator translator = getTranslator(node, batchContext); + PTransform transform = node.getTransform(); + + // Root of the graph is null + if (transform == null) { + return CompositeBehavior.ENTER_TRANSFORM; + } + + BatchTransformTranslator translator = getTranslator(transform, batchContext); if (translator != null) { - applyBatchTransform(node.getTransform(), node, translator); + // This is a composite with a custom translator + applyBatchTransform(transform, node, translator); LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName()); return CompositeBehavior.DO_NOT_ENTER_TRANSFORM; } else { + // Compoosite without a custom translator return CompositeBehavior.ENTER_TRANSFORM; } } @@ -91,7 +98,10 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { // get the transformation corresponding to the node we are // currently visiting and translate it into its Flink alternative. - PTransform transform = node.getTransform(); + PTransform transform = + checkStateNotNull( + node.getTransform(), "visitPrimitiveTransform invoked on node with no PTransform"); + BatchTransformTranslator translator = FlinkBatchTransformTranslators.getTranslator(transform, batchContext); if (translator == null) { @@ -119,7 +129,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { } /** A translator of a {@link PTransform}. */ - public interface BatchTransformTranslator { + public interface BatchTransformTranslator> { default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext context) { return true; @@ -129,14 +139,8 @@ default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext } /** Returns a translator for the given node, if it is possible, otherwise null. */ - private static BatchTransformTranslator getTranslator( - TransformHierarchy.Node node, FlinkBatchTranslationContext context) { - @Nullable PTransform transform = node.getTransform(); - - // Root of the graph is null - if (transform == null) { - return null; - } + private static @Nullable BatchTransformTranslator getTranslator( + PTransform transform, FlinkBatchTranslationContext context) { return FlinkBatchTransformTranslators.getTranslator(transform, context); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 952546251c7a..131bc052e22a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; @@ -36,14 +38,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Helper for {@link FlinkBatchPipelineTranslator} and translators in {@link * FlinkBatchTransformTranslators}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class FlinkBatchTranslationContext { private final Map> dataSets; @@ -58,7 +58,7 @@ class FlinkBatchTranslationContext { private final ExecutionEnvironment env; private final PipelineOptions options; - private AppliedPTransform currentTransform; + private @Nullable AppliedPTransform currentTransform; private final CountingPipelineVisitor countingPipelineVisitor = new CountingPipelineVisitor(); private final LookupPipelineVisitor lookupPipelineVisitor = new LookupPipelineVisitor(); @@ -97,7 +97,8 @@ public PipelineOptions getPipelineOptions() { DataSet> getInputDataSet(PValue value) { // assume that the DataSet is used as an input if retrieved here danglingDataSets.remove(value); - return (DataSet>) dataSets.get(value); + return (DataSet>) + checkStateNotNull(dataSets.get(value), "No data set associated with PValue " + value); } void setOutputDataSet(PValue value, DataSet> set) { @@ -117,7 +118,9 @@ void setCurrentTransform(AppliedPTransform currentTransform) { } AppliedPTransform getCurrentTransform() { - return currentTransform; + return checkStateNotNull( + currentTransform, + "Attempted to get current transform when not in context of translating any transform"); } Map, Coder> getOutputCoders(PTransform transform) { @@ -126,7 +129,10 @@ Map, Coder> getOutputCoders(PTransform transform) { @SuppressWarnings("unchecked") DataSet getSideInputDataSet(PCollectionView value) { - return (DataSet) broadcastDataSets.get(value); + return (DataSet) + checkStateNotNull( + broadcastDataSets.get(value), + "No broadcast data set associated with PCollectionView " + value); } void setSideInputDataSet( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java index 102340329b6b..ab673bc950f1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Map; @@ -63,9 +64,6 @@ import org.slf4j.LoggerFactory; /** Utilities for Flink execution environments. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkExecutionEnvironments { private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); @@ -225,7 +223,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment( hostAndPort.getPort(), flinkConfiguration, filesToStage.toArray(new String[filesToStage.size()]), - null, + new URL[] {}, savepointRestoreSettings); LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort()); } @@ -325,7 +323,7 @@ public Map toMap() { } @Override - public boolean equals(Object obj) { + public boolean equals(@Nullable Object obj) { if (obj == null || this.getClass() != obj.getClass()) { return false; } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java index bbb3cc67ca4a..fc6b5eaf8052 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java @@ -26,19 +26,16 @@ import org.apache.beam.runners.jobsubmission.JobInvoker; import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PortablePipelineOptions; import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Job Invoker for the {@link FlinkRunner}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkJobInvoker extends JobInvoker { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class); @@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo protected JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) { // TODO: How to make Java/Python agree on names of keys and their values? @@ -86,7 +83,7 @@ protected JobInvocation invokeWithExecutor( pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class); } - flinkOptions.setRunner(null); + unsafelySetRunnerNullForUnknownReason(flinkOptions); LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner); return createJobInvocation( @@ -108,4 +105,9 @@ protected JobInvocation createJobInvocation( PipelineOptionsTranslation.toProto(flinkOptions)); return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner); } + + @SuppressWarnings("nullness") + private void unsafelySetRunnerNullForUnknownReason(PipelineOptions options) { + options.setRunner(null); + } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index 671cc2597cb2..6d584525e5ba 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -31,9 +31,6 @@ import org.slf4j.LoggerFactory; /** Driver program that starts a job server for the Flink runner. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkJobServerDriver extends JobServerDriver { private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class); @@ -59,7 +56,7 @@ String getFlinkMaster() { "Directory containing Flink YAML configuration files. " + "These properties will be set to all jobs submitted to Flink and take precedence " + "over configurations in FLINK_CONF_DIR.") - private String flinkConfDir = null; + private @Nullable String flinkConfDir = null; @Nullable String getFlinkConfDir() { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 029eff25a825..37cf1dc39e84 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import java.util.Map; import org.apache.beam.runners.core.metrics.MetricsPusher; @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +45,6 @@ * FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and executes the * (translated) job. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class FlinkPipelineExecutionEnvironment { private static final Logger LOG = @@ -60,7 +58,7 @@ class FlinkPipelineExecutionEnvironment { * org.apache.flink.api.java.LocalEnvironment} or a {@link * org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration options. */ - private ExecutionEnvironment flinkBatchEnv; + private @Nullable ExecutionEnvironment flinkBatchEnv; /** * The Flink Streaming execution environment. This is instantiated to either a {@link @@ -68,7 +66,7 @@ class FlinkPipelineExecutionEnvironment { * org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending on the * configuration options, and more specifically, the url of the master. */ - private StreamExecutionEnvironment flinkStreamEnv; + private @Nullable StreamExecutionEnvironment flinkStreamEnv; /** * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the @@ -77,7 +75,7 @@ class FlinkPipelineExecutionEnvironment { * @param options the user-defined pipeline options. */ FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { - this.options = checkNotNull(options); + this.options = options; } /** @@ -103,7 +101,8 @@ public void translate(Pipeline pipeline) { FlinkPipelineTranslator translator; if (options.isStreaming() || options.getUseDataStreamForBatch()) { - this.flinkStreamEnv = FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); + StreamExecutionEnvironment flinkStreamEnv = + FlinkExecutionEnvironments.createStreamExecutionEnvironment(options); if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) { LOG.warn( "UnboundedSources present which rely on checkpointing, but checkpointing is disabled."); @@ -113,6 +112,7 @@ public void translate(Pipeline pipeline) { if (!options.isStreaming()) { flinkStreamEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); } + this.flinkStreamEnv = flinkStreamEnv; } else { this.flinkBatchEnv = FlinkExecutionEnvironments.createBatchExecutionEnvironment(options); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); @@ -141,6 +141,7 @@ public PipelineResult executePipeline() throws Exception { final String jobName = options.getJobName(); if (flinkBatchEnv != null) { + ExecutionEnvironment flinkBatchEnv = this.flinkBatchEnv; if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkBatchEnv.execute(jobName); return createAttachedPipelineResult(jobExecutionResult); @@ -149,6 +150,7 @@ public PipelineResult executePipeline() throws Exception { return createDetachedPipelineResult(jobClient, options); } } else if (flinkStreamEnv != null) { + StreamExecutionEnvironment flinkStreamEnv = this.flinkStreamEnv; if (options.getAttachedMode()) { JobExecutionResult jobExecutionResult = flinkStreamEnv.execute(jobName); return createAttachedPipelineResult(jobExecutionResult); @@ -170,7 +172,7 @@ private FlinkDetachedRunnerResult createDetachedPipelineResult( private FlinkRunnerResult createAttachedPipelineResult(JobExecutionResult result) { LOG.info("Execution finished in {} msecs", result.getNetRuntime()); Map accumulators = result.getAllAccumulatorResults(); - if (accumulators != null && !accumulators.isEmpty()) { + if (!accumulators.isEmpty()) { LOG.info("Final accumulator values:"); for (Map.Entry entry : result.getAllAccumulatorResults().entrySet()) { LOG.info("{} : {}", entry.getKey(), entry.getValue()); @@ -194,7 +196,7 @@ private FlinkRunnerResult createAttachedPipelineResult(JobExecutionResult result @VisibleForTesting JobGraph getJobGraph(Pipeline p) { translate(p); - StreamGraph streamGraph = flinkStreamEnv.getStreamGraph(); + StreamGraph streamGraph = checkStateNotNull(flinkStreamEnv).getStreamGraph(); // Normally the job name is set when we execute the job, and JobGraph is immutable, so we need // to set the job name here. streamGraph.setJobName(p.getOptions().getJobName()); @@ -203,11 +205,11 @@ JobGraph getJobGraph(Pipeline p) { @VisibleForTesting ExecutionEnvironment getBatchExecutionEnvironment() { - return flinkBatchEnv; + return checkStateNotNull(flinkBatchEnv); } @VisibleForTesting StreamExecutionEnvironment getStreamExecutionEnvironment() { - return flinkStreamEnv; + return checkStateNotNull(flinkStreamEnv); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 519afa795bc3..1d64ff8013fb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; +import org.checkerframework.dataflow.qual.Pure; /** * Options which can be used to configure the Flink Runner. diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java index 04b3b7e202dc..b9d4be5ed701 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.apache.beam.model.jobmanagement.v1.ArtifactApi; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -49,14 +50,11 @@ import org.slf4j.LoggerFactory; /** Runs a Pipeline on Flink via {@link FlinkRunner}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkPipelineRunner implements PortablePipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); private final FlinkPipelineOptions pipelineOptions; - private final String confDir; + private final @Nullable String confDir; private final List filesToStage; /** @@ -110,8 +108,8 @@ PortablePipelineResult runPipelineWithTranslator( private PortablePipelineResult createPortablePipelineResult( JobExecutionResult result, PipelineOptions options) { - String resultClassName = result.getClass().getCanonicalName(); - if (resultClassName.equals("org.apache.flink.core.execution.DetachedJobExecutionResult")) { + @Nullable Class resultClass = result.getClass(); + if (resultClass != null && Objects.equals(resultClass.getCanonicalName(), "org.apache.flink.core.execution.DetachedJobExecutionResult")) { LOG.info("Pipeline submitted in Detached mode"); // no metricsPusher because metrics are not supported in detached mode return new FlinkPortableRunnerResult.Detached(); @@ -195,14 +193,14 @@ private static class FlinkPipelineRunnerConfiguration { "Directory containing Flink YAML configuration files. " + "These properties will be set to all jobs submitted to Flink and take precedence " + "over configurations in FLINK_CONF_DIR.") - private String flinkConfDir = null; + private @Nullable String flinkConfDir = null; @Option( name = "--base-job-name", usage = "The job to run. This must correspond to a subdirectory of the jar's BEAM-PIPELINE " + "directory. *Only needs to be specified if the jar contains multiple pipelines.*") - private String baseJobName = null; + private @Nullable String baseJobName = null; } private static FlinkPipelineRunnerConfiguration parseArgs(String[] args) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 6945bead80bb..4916750bbaee 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.util.HashSet; import java.util.Set; import java.util.SortedSet; @@ -43,9 +45,6 @@ * to a Flink Plan and then executing them either locally or on a Flink cluster, depending on the * configuration. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkRunner extends PipelineRunner { private static final Logger LOG = LoggerFactory.getLogger(FlinkRunner.class); @@ -139,14 +138,22 @@ private void logWarningIfPCollectionViewHasNonDeterministicKeyCoder(Pipeline pip @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + PTransform transform = + checkStateNotNull( + node.getTransform(), + "Primitive transform node does not have associated PTransform."); + if (ptransformViewsWithNonDeterministicKeyCoders.contains(transform)) { ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); } } @Override public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { - if (ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) { + PTransform transform = + checkStateNotNull( + node.getTransform(), + "Composite transform node does not have associated PTransform."); + if (ptransformViewsWithNonDeterministicKeyCoders.contains(transform)) { ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName()); } return CompositeBehavior.ENTER_TRANSFORM; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index d892049bce4b..f106c493dd4a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -19,6 +19,7 @@ import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.ACCUMULATOR_NAME; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import java.util.Collections; import java.util.Map; @@ -31,9 +32,6 @@ * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This has methods to query * to job runtime and the final values of the accumulators. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class FlinkRunnerResult implements PipelineResult { private final Map accumulators; @@ -42,9 +40,7 @@ public class FlinkRunnerResult implements PipelineResult { FlinkRunnerResult(Map accumulators, long runtime) { this.accumulators = - (accumulators == null || accumulators.isEmpty()) - ? Collections.emptyMap() - : Collections.unmodifiableMap(accumulators); + accumulators.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(accumulators); this.runtime = runtime; } @@ -80,6 +76,9 @@ public MetricResults metrics() { } MetricsContainerStepMap getMetricsContainerStepMap() { - return (MetricsContainerStepMap) accumulators.get(ACCUMULATOR_NAME); + return (MetricsContainerStepMap) + checkStateNotNull( + accumulators.get(ACCUMULATOR_NAME), + "Cannot get metrics container step map: no such accumulator " + ACCUMULATOR_NAME); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 10ba64a77148..f5a4ebd1d718 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import java.util.HashMap; import java.util.Map; @@ -39,14 +39,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Helper for keeping track of which {@link DataStream DataStreams} map to which {@link PTransform * PTransforms}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class FlinkStreamingTranslationContext { private final StreamExecutionEnvironment env; @@ -61,12 +59,12 @@ class FlinkStreamingTranslationContext { private final Map> producers = new HashMap<>(); - private AppliedPTransform currentTransform; + private @Nullable AppliedPTransform currentTransform = null; public FlinkStreamingTranslationContext( StreamExecutionEnvironment env, PipelineOptions options, boolean isStreaming) { - this.env = checkNotNull(env); - this.options = checkNotNull(options); + this.env = env; + this.options = options; this.isStreaming = isStreaming; } @@ -84,11 +82,13 @@ public boolean isStreaming() { @SuppressWarnings("unchecked") public DataStream getInputDataStream(PValue value) { - return (DataStream) dataStreams.get(value); + return (DataStream) + checkStateNotNull(dataStreams.get(value), "No data stream associated with PValue " + value); } public void setOutputDataStream(PValue value, DataStream set) { - final PTransform previousProducer = producers.put(value, currentTransform.getTransform()); + final PTransform previousProducer = + producers.put(value, getCurrentTransform().getTransform()); Preconditions.checkArgument( previousProducer == null, "PValue can only have a single producer."); if (!dataStreams.containsKey(value)) { @@ -98,7 +98,9 @@ public void setOutputDataStream(PValue value, DataStream set) { @SuppressWarnings({"unchecked", "rawtypes"}) PTransform getProducer(T value) { - return (PTransform) producers.get(value); + return (PTransform) + checkStateNotNull( + producers.get(value), "No producer PTransform associated with PValue " + value); } /** @@ -121,7 +123,7 @@ public Coder getInputCoder(PCollection collection) { } public Map, Coder> getOutputCoders() { - return currentTransform.getOutputs().entrySet().stream() + return getCurrentTransform().getOutputs().entrySet().stream() .filter(e -> e.getValue() instanceof PCollection) .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder())); } @@ -132,25 +134,27 @@ public TypeInformation> getTypeInfo(PCollection collecti } public AppliedPTransform getCurrentTransform() { - return currentTransform; + return checkStateNotNull( + currentTransform, + "Attempted to get current transform when not in context of translating any transform"); } @SuppressWarnings("unchecked") public T getInput(PTransform transform) { - return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); } public Map, PCollection> getInputs(PTransform transform) { - return currentTransform.getInputs(); + return getCurrentTransform().getInputs(); } @SuppressWarnings("unchecked") public T getOutput(PTransform transform) { - return (T) Iterables.getOnlyElement(currentTransform.getOutputs().values()); + return (T) Iterables.getOnlyElement(getCurrentTransform().getOutputs().values()); } public Map, PCollection> getOutputs( PTransform transform) { - return currentTransform.getOutputs(); + return getCurrentTransform().getOutputs(); } } diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java index 24f82e752b63..8ed09a245b76 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvoker.java @@ -39,7 +39,7 @@ public abstract class JobInvoker { protected abstract JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) throws IOException; diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java index 2a45f20060e3..7f73b0d6d9bf 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvoker.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.samza; import java.util.UUID; -import javax.annotation.Nullable; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.JobInvocation; @@ -56,7 +55,7 @@ protected SamzaJobInvoker(String name) { protected JobInvocation invokeWithExecutor( RunnerApi.Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) { LOG.trace("Parsing pipeline options"); final SamzaPortablePipelineOptions samzaOptions = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java index 3125dedd1a36..5b3972619101 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java @@ -29,7 +29,6 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +56,7 @@ private SparkJobInvoker(SparkJobServerDriver.SparkServerConfiguration configurat protected JobInvocation invokeWithExecutor( Pipeline pipeline, Struct options, - @Nullable String retrievalToken, + String retrievalToken, ListeningExecutorService executorService) { LOG.trace("Parsing pipeline options"); SparkPipelineOptions sparkOptions = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/ClasspathScanningResourcesDetector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/ClasspathScanningResourcesDetector.java index 4c289603ae0b..a89661daffd3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/ClasspathScanningResourcesDetector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/ClasspathScanningResourcesDetector.java @@ -21,6 +21,8 @@ import java.io.File; import java.util.List; import java.util.stream.Collectors; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Attempts to detect all the resources to be staged using classgraph library. @@ -43,11 +45,16 @@ public ClasspathScanningResourcesDetector(ClassGraph classGraph) { * @return A list of absolute paths to the resources the class loader uses. */ @Override - public List detect(ClassLoader classLoader) { + public List detect(@Nullable ClassLoader classLoader) { + + @SuppressWarnings("nullness") // ClassGraph is not null-correct + @NonNull + ClassLoader nullableClassLoader = classLoader; + List classpathContents = classGraph .disableNestedJarScanning() - .addClassLoader(classLoader) + .addClassLoader(nullableClassLoader) .scan(1) .getClasspathFiles(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResources.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResources.java index 545817f0d913..ed3a104cf056 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResources.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResources.java @@ -35,6 +35,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Funnels; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hasher; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public class PipelineResources { * @return A list of absolute paths to the resources the class loader uses. */ public static List detectClassPathResourcesToStage( - ClassLoader classLoader, PipelineOptions options) { + @Nullable ClassLoader classLoader, PipelineOptions options) { PipelineResourcesOptions artifactsRelatedOptions = options.as(PipelineResourcesOptions.class); List detectedResources = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResourcesDetector.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResourcesDetector.java index e6146db4d72e..89919c89bc1b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResourcesDetector.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/resources/PipelineResourcesDetector.java @@ -19,11 +19,12 @@ import java.io.Serializable; import java.util.List; +import org.checkerframework.checker.nullness.qual.Nullable; /** Interface for an algorithm detecting classpath resources for pipelines. */ public interface PipelineResourcesDetector extends Serializable { - List detect(ClassLoader classLoader); + List detect(@Nullable ClassLoader classLoader); /** Provides pipeline resources detection algorithm. */ interface Factory {