Skip to content

Commit

Permalink
Eliminate nullness errors from a bit of the FlinkRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Oct 23, 2024
1 parent 8d22fc2 commit be7d2ea
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;

/** Flink streaming overrides for various view (side input) transforms. */
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class CreateStreamingFlinkView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
private final PCollectionView<ViewT> view;
Expand Down Expand Up @@ -110,7 +107,7 @@ public static class Factory<ElemT, ViewT>
PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>> {

static final Factory INSTANCE = new Factory();
static final Factory<?, ?> INSTANCE = new Factory<>();

private Factory() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
Expand All @@ -30,10 +32,6 @@
import org.slf4j.LoggerFactory;

/** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Flink batch job. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {

private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
Expand All @@ -48,7 +46,7 @@ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions op
}

@Override
@SuppressWarnings("rawtypes, unchecked")
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(Pipeline pipeline) {
batchContext.init(pipeline);
super.translate(pipeline);
Expand All @@ -68,13 +66,22 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
this.depth++;

BatchTransformTranslator<?> translator = getTranslator(node, batchContext);
PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
return CompositeBehavior.ENTER_TRANSFORM;
}

BatchTransformTranslator<?> translator = getTranslator(transform, batchContext);

if (translator != null) {
applyBatchTransform(node.getTransform(), node, translator);
// This is a composite with a custom translator
applyBatchTransform(transform, node, translator);
LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
// Compoosite without a custom translator
return CompositeBehavior.ENTER_TRANSFORM;
}
}
Expand All @@ -91,7 +98,10 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {

// get the transformation corresponding to the node we are
// currently visiting and translate it into its Flink alternative.
PTransform<?, ?> transform = node.getTransform();
PTransform<?, ?> transform =
checkStateNotNull(
node.getTransform(), "visitPrimitiveTransform invoked on node with no PTransform");

BatchTransformTranslator<?> translator =
FlinkBatchTransformTranslators.getTranslator(transform, batchContext);
if (translator == null) {
Expand Down Expand Up @@ -119,7 +129,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
}

/** A translator of a {@link PTransform}. */
public interface BatchTransformTranslator<TransformT extends PTransform> {
public interface BatchTransformTranslator<TransformT extends PTransform<?, ?>> {

default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext context) {
return true;
Expand All @@ -129,14 +139,8 @@ default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext
}

/** Returns a translator for the given node, if it is possible, otherwise null. */
private static BatchTransformTranslator<?> getTranslator(
TransformHierarchy.Node node, FlinkBatchTranslationContext context) {
@Nullable PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
return null;
}
private static @Nullable BatchTransformTranslator<?> getTranslator(
PTransform<?, ?> transform, FlinkBatchTranslationContext context) {

return FlinkBatchTransformTranslators.getTranslator(transform, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
Expand All @@ -36,14 +38,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Helper for {@link FlinkBatchPipelineTranslator} and translators in {@link
* FlinkBatchTransformTranslators}.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class FlinkBatchTranslationContext {

private final Map<PValue, DataSet<?>> dataSets;
Expand All @@ -58,7 +58,7 @@ class FlinkBatchTranslationContext {
private final ExecutionEnvironment env;
private final PipelineOptions options;

private AppliedPTransform<?, ?, ?> currentTransform;
private @Nullable AppliedPTransform<?, ?, ?> currentTransform;

private final CountingPipelineVisitor countingPipelineVisitor = new CountingPipelineVisitor();
private final LookupPipelineVisitor lookupPipelineVisitor = new LookupPipelineVisitor();
Expand Down Expand Up @@ -97,7 +97,8 @@ public PipelineOptions getPipelineOptions() {
<T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
// assume that the DataSet is used as an input if retrieved here
danglingDataSets.remove(value);
return (DataSet<WindowedValue<T>>) dataSets.get(value);
return (DataSet<WindowedValue<T>>)
checkStateNotNull(dataSets.get(value), "No data set associated with PValue " + value);
}

<T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
Expand All @@ -117,7 +118,9 @@ void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
}

AppliedPTransform<?, ?, ?> getCurrentTransform() {
return currentTransform;
return checkStateNotNull(
currentTransform,
"Attempted to get current transform when not in context of translating any transform");
}

Map<TupleTag<?>, Coder<?>> getOutputCoders(PTransform<?, ?> transform) {
Expand All @@ -126,7 +129,10 @@ Map<TupleTag<?>, Coder<?>> getOutputCoders(PTransform<?, ?> transform) {

@SuppressWarnings("unchecked")
<T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
return (DataSet<T>) broadcastDataSets.get(value);
return (DataSet<T>)
checkStateNotNull(
broadcastDataSets.get(value),
"No broadcast data set associated with PCollectionView " + value);
}

<ViewT, ElemT> void setSideInputDataSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,9 +64,6 @@
import org.slf4j.LoggerFactory;

/** Utilities for Flink execution environments. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkExecutionEnvironments {

private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
Expand Down Expand Up @@ -225,7 +223,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
hostAndPort.getPort(),
flinkConfiguration,
filesToStage.toArray(new String[filesToStage.size()]),
null,
new URL[] {},
savepointRestoreSettings);
LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort());
}
Expand Down Expand Up @@ -325,7 +323,7 @@ public Map<String, String> toMap() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
if (obj == null || this.getClass() != obj.getClass()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Job Invoker for the {@link FlinkRunner}. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkJobInvoker extends JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);

Expand All @@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
protected JobInvocation invokeWithExecutor(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken,
String retrievalToken,
ListeningExecutorService executorService) {

// TODO: How to make Java/Python agree on names of keys and their values?
Expand Down Expand Up @@ -86,7 +83,7 @@ protected JobInvocation invokeWithExecutor(
pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class);
}

flinkOptions.setRunner(null);
unsafelySetRunnerNullForUnknownReason(flinkOptions);

LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner);
return createJobInvocation(
Expand All @@ -108,4 +105,9 @@ protected JobInvocation createJobInvocation(
PipelineOptionsTranslation.toProto(flinkOptions));
return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
}

@SuppressWarnings("nullness")
private void unsafelySetRunnerNullForUnknownReason(PipelineOptions options) {
options.setRunner(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.slf4j.LoggerFactory;

/** Driver program that starts a job server for the Flink runner. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkJobServerDriver extends JobServerDriver {

private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
Expand All @@ -59,7 +56,7 @@ String getFlinkMaster() {
"Directory containing Flink YAML configuration files. "
+ "These properties will be set to all jobs submitted to Flink and take precedence "
+ "over configurations in FLINK_CONF_DIR.")
private String flinkConfDir = null;
private @Nullable String flinkConfDir = null;

@Nullable
String getFlinkConfDir() {
Expand Down
Loading

0 comments on commit be7d2ea

Please sign in to comment.