Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate nullness errors from a bit of the FlinkRunner #32925

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;

/** Flink streaming overrides for various view (side input) transforms. */
@SuppressWarnings({
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
class CreateStreamingFlinkView<ElemT, ViewT>
extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
private final PCollectionView<ViewT> view;
Expand Down Expand Up @@ -110,7 +107,7 @@ public static class Factory<ElemT, ViewT>
PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>> {

static final Factory INSTANCE = new Factory();
static final Factory<?, ?> INSTANCE = new Factory<>();

private Factory() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
Expand All @@ -30,10 +32,6 @@
import org.slf4j.LoggerFactory;

/** {@link Pipeline.PipelineVisitor} for executing a {@link Pipeline} as a Flink batch job. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {

private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
Expand All @@ -48,7 +46,7 @@ public FlinkBatchPipelineTranslator(ExecutionEnvironment env, PipelineOptions op
}

@Override
@SuppressWarnings("rawtypes, unchecked")
@SuppressWarnings({"rawtypes", "unchecked"})
public void translate(Pipeline pipeline) {
batchContext.init(pipeline);
super.translate(pipeline);
Expand All @@ -68,13 +66,22 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
LOG.info("{} enterCompositeTransform- {}", genSpaces(this.depth), node.getFullName());
this.depth++;

BatchTransformTranslator<?> translator = getTranslator(node, batchContext);
PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
return CompositeBehavior.ENTER_TRANSFORM;
}

BatchTransformTranslator<?> translator = getTranslator(transform, batchContext);

if (translator != null) {
applyBatchTransform(node.getTransform(), node, translator);
// This is a composite with a custom translator
applyBatchTransform(transform, node, translator);
LOG.info("{} translated- {}", genSpaces(this.depth), node.getFullName());
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
} else {
// Compoosite without a custom translator
return CompositeBehavior.ENTER_TRANSFORM;
}
}
Expand All @@ -91,7 +98,10 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {

// get the transformation corresponding to the node we are
// currently visiting and translate it into its Flink alternative.
PTransform<?, ?> transform = node.getTransform();
PTransform<?, ?> transform =
checkStateNotNull(
node.getTransform(), "visitPrimitiveTransform invoked on node with no PTransform");

BatchTransformTranslator<?> translator =
FlinkBatchTransformTranslators.getTranslator(transform, batchContext);
if (translator == null) {
Expand Down Expand Up @@ -119,7 +129,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
}

/** A translator of a {@link PTransform}. */
public interface BatchTransformTranslator<TransformT extends PTransform> {
public interface BatchTransformTranslator<TransformT extends PTransform<?, ?>> {

default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext context) {
return true;
Expand All @@ -129,14 +139,8 @@ default boolean canTranslate(TransformT transform, FlinkBatchTranslationContext
}

/** Returns a translator for the given node, if it is possible, otherwise null. */
private static BatchTransformTranslator<?> getTranslator(
TransformHierarchy.Node node, FlinkBatchTranslationContext context) {
@Nullable PTransform<?, ?> transform = node.getTransform();

// Root of the graph is null
if (transform == null) {
return null;
}
private static @Nullable BatchTransformTranslator<?> getTranslator(
PTransform<?, ?> transform, FlinkBatchTranslationContext context) {

return FlinkBatchTransformTranslators.getTranslator(transform, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.flink;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
Expand All @@ -36,14 +38,12 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Helper for {@link FlinkBatchPipelineTranslator} and translators in {@link
* FlinkBatchTransformTranslators}.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class FlinkBatchTranslationContext {

private final Map<PValue, DataSet<?>> dataSets;
Expand All @@ -58,7 +58,7 @@ class FlinkBatchTranslationContext {
private final ExecutionEnvironment env;
private final PipelineOptions options;

private AppliedPTransform<?, ?, ?> currentTransform;
private @Nullable AppliedPTransform<?, ?, ?> currentTransform;

private final CountingPipelineVisitor countingPipelineVisitor = new CountingPipelineVisitor();
private final LookupPipelineVisitor lookupPipelineVisitor = new LookupPipelineVisitor();
Expand Down Expand Up @@ -97,7 +97,8 @@ public PipelineOptions getPipelineOptions() {
<T> DataSet<WindowedValue<T>> getInputDataSet(PValue value) {
// assume that the DataSet is used as an input if retrieved here
danglingDataSets.remove(value);
return (DataSet<WindowedValue<T>>) dataSets.get(value);
return (DataSet<WindowedValue<T>>)
checkStateNotNull(dataSets.get(value), "No data set associated with PValue " + value);
}

<T> void setOutputDataSet(PValue value, DataSet<WindowedValue<T>> set) {
Expand All @@ -117,7 +118,9 @@ void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) {
}

AppliedPTransform<?, ?, ?> getCurrentTransform() {
return currentTransform;
return checkStateNotNull(
currentTransform,
"Attempted to get current transform when not in context of translating any transform");
}

Map<TupleTag<?>, Coder<?>> getOutputCoders(PTransform<?, ?> transform) {
Expand All @@ -126,7 +129,10 @@ Map<TupleTag<?>, Coder<?>> getOutputCoders(PTransform<?, ?> transform) {

@SuppressWarnings("unchecked")
<T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
return (DataSet<T>) broadcastDataSets.get(value);
return (DataSet<T>)
checkStateNotNull(
broadcastDataSets.get(value),
"No broadcast data set associated with PCollectionView " + value);
}

<ViewT, ElemT> void setSideInputDataSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,9 +64,6 @@
import org.slf4j.LoggerFactory;

/** Utilities for Flink execution environments. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkExecutionEnvironments {

private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class);
Expand Down Expand Up @@ -225,7 +223,7 @@ public static StreamExecutionEnvironment createStreamExecutionEnvironment(
hostAndPort.getPort(),
flinkConfiguration,
filesToStage.toArray(new String[filesToStage.size()]),
null,
new URL[] {},
savepointRestoreSettings);
LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort());
}
Expand Down Expand Up @@ -325,7 +323,7 @@ public Map<String, String> toMap() {
}

@Override
public boolean equals(Object obj) {
public boolean equals(@Nullable Object obj) {
if (obj == null || this.getClass() != obj.getClass()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.construction.PipelineOptionsTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Job Invoker for the {@link FlinkRunner}. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkJobInvoker extends JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);

Expand All @@ -57,7 +54,7 @@ protected FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverCo
protected JobInvocation invokeWithExecutor(
RunnerApi.Pipeline pipeline,
Struct options,
@Nullable String retrievalToken,
String retrievalToken,
ListeningExecutorService executorService) {

// TODO: How to make Java/Python agree on names of keys and their values?
Expand Down Expand Up @@ -86,7 +83,7 @@ protected JobInvocation invokeWithExecutor(
pipelineRunner = new PortablePipelineJarCreator(FlinkPipelineRunner.class);
}

flinkOptions.setRunner(null);
unsafelySetRunnerNullForUnknownReason(flinkOptions);

LOG.info("Invoking job {} with pipeline runner {}", invocationId, pipelineRunner);
return createJobInvocation(
Expand All @@ -108,4 +105,9 @@ protected JobInvocation createJobInvocation(
PipelineOptionsTranslation.toProto(flinkOptions));
return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
}

@SuppressWarnings("nullness")
private void unsafelySetRunnerNullForUnknownReason(PipelineOptions options) {
options.setRunner(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@
import org.slf4j.LoggerFactory;

/** Driver program that starts a job server for the Flink runner. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class FlinkJobServerDriver extends JobServerDriver {

private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriver.class);
Expand All @@ -59,7 +56,7 @@ String getFlinkMaster() {
"Directory containing Flink YAML configuration files. "
+ "These properties will be set to all jobs submitted to Flink and take precedence "
+ "over configurations in FLINK_CONF_DIR.")
private String flinkConfDir = null;
private @Nullable String flinkConfDir = null;

@Nullable
String getFlinkConfDir() {
Expand Down
Loading
Loading