Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial prototype of using Beam transforms directly in a Flink pipeline. #30332

Merged
merged 9 commits into from
Mar 4, 2024

Conversation

robertwb
Copy link
Contributor

Handling DataStream, windowing, and more complex types will come in a future PR.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Handling DataStream, windowing, and more complex types will come in a future PR.
@robertwb
Copy link
Contributor Author

R: @kennknowles

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@robertwb
Copy link
Contributor Author

(Looks like I'm going to have to rebase this on the org.apache.beam.runners.core.construction move...)

@robertwb
Copy link
Contributor Author

(Looks like I'm going to have to rebase this on the org.apache.beam.runners.core.construction move...)

Done. PTAL.

@robertwb
Copy link
Contributor Author

Ping.

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My overall comment is just to tighten it up by removing suppressions and looking to eliminate implementation inheritance.

public class BeamAdapterUtils {
private BeamAdapterUtils() {}

@SuppressWarnings("nullness")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This much cyclomatic complexity and you want to suppress null checks? 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to suppress them as locally as possible.

I've changed this to explicitly assert non-nullness here, but I'm not sure it's an improvement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local checks/assertions are better, because they don't mask errors that might be introduced as the code is edited. We should aim for no broken windows / zero suppressions. Certainly in new code. Any suppression should be linked to a bug with an explanation for why it cannot be addressed.

Using org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull or org.apache.beam.sdkutil.Preconditions.checkStateNotNull are both good options. They convert NPE at some random point in code to an IllegalArgumentException or IllegalStateException at the point of detection.

}
}

static <T> TypeInformation<T> coderTotoTypeInformation(Coder<T> coder, PipelineOptions options) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Toto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.apache.flink.api.java.ExecutionEnvironment;

public abstract class BeamFlinkAbstractAdapter<DataSetOrStream> {
protected final PipelineOptions pipelineOptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take here is that this superclass is sensible standalone logic that is not in a mutually-recursive relationship with the subclass. So remove the subclassing and pass the { getTypeInformation; createContext } thing in to the logic here.

Kind of a stylistic point, but still, protected fields where the values are passed through the super constructor anti-pattern called attention to it.

Plus, then the stricter separation between Dataset / DataStream parameterization will be better, and this primary logic won't have it bleed in.

Unless there is a mutual recursion I missed... which also I would have missed because the control flow between subclass and superclass is hard to follow, hence good to eliminate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to extract it out. (This was an artifact of splitting the class up on the generic vs. type-specific bits.) I think this is better now, though it does give one method that has a ton of parameters...

I also pulled out FlinkInput/FlinkOutput, but we could make them inner classes again if you'd prefer.

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;

public abstract class BeamFlinkAbstractAdapter<DataSetOrStream> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

T suffix would be traditional to indicate it is a type variable (since java doesn't have lexicographic distinction)


DataSet<String> input = env.fromCollection(ImmutableList.of("a", "b", "c"));
DataSet<String> result =
new BeamFlinkDataSetAdapter(PipelineOptionsFactory.create(), env)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we should try to find a way to polish this into an extremely minimal API as we go:

  • make the pipeline options optional and default them internal to the adapter
  • extract the env from the inputs so you don't have to pass it here

Ideally we could get very close to this signature:

   DataSet<OutputT> applyBeamTransform(DataSet<InputT>, PTransform<InputT, OutputT>)

(which of course is begging to be upstreamed into DataSet API)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

PipelineOptions are now optional. I had initially thought of extracting the env from the input, but this is not possible transforms with no inputs. I actually don't think that's so bad for the Reads (it's similar to what one needs to do for Flink anyway) but this now precludes empty tuples. I think these are rare, we could add an overload taking the environment explicitly if need be.

I had considered going all the way and making these static methods, but I want to reserve the right to store any other state here (including possibly across invocations) and this'll allow for people to choose shorter variable names too.

translator.translate(translationContext, translator.prepareForTranslation(pipelineProto));
}

static class FlinkInput<T> extends PTransform<PBegin, PCollection<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall how the visibility of this resolves. Perhaps a minor thing but it would be nice for this to be package-private. In fact ideally everything is package private except just the call to applyBeamTransform(DataSet, PTransform)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are package private. applyBeamTransform is the only thing I want to be public.

import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.NativeTransforms;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had forgotten about this or missed it. That's fun...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Means there's possibly issues linking in more than one runner though...

@kennknowles
Copy link
Member

Otherwise LGTM as far as getting this going. Very cool!

Copy link
Contributor Author

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your thoughtful review. PTAL.

}
}

static <T> TypeInformation<T> coderTotoTypeInformation(Coder<T> coder, PipelineOptions options) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public class BeamAdapterUtils {
private BeamAdapterUtils() {}

@SuppressWarnings("nullness")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to suppress them as locally as possible.

I've changed this to explicitly assert non-nullness here, but I'm not sure it's an improvement.

import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.Environments;
import org.apache.beam.sdk.util.construction.NativeTransforms;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Means there's possibly issues linking in more than one runner though...


DataSet<String> input = env.fromCollection(ImmutableList.of("a", "b", "c"));
DataSet<String> result =
new BeamFlinkDataSetAdapter(PipelineOptionsFactory.create(), env)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

PipelineOptions are now optional. I had initially thought of extracting the env from the input, but this is not possible transforms with no inputs. I actually don't think that's so bad for the Reads (it's similar to what one needs to do for Flink anyway) but this now precludes empty tuples. I think these are rare, we could add an overload taking the environment explicitly if need be.

I had considered going all the way and making these static methods, but I want to reserve the right to store any other state here (including possibly across invocations) and this'll allow for people to choose shorter variable names too.

translator.translate(translationContext, translator.prepareForTranslation(pipelineProto));
}

static class FlinkInput<T> extends PTransform<PBegin, PCollection<T>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are package private. applyBeamTransform is the only thing I want to be public.

import org.apache.flink.api.java.ExecutionEnvironment;

public abstract class BeamFlinkAbstractAdapter<DataSetOrStream> {
protected final PipelineOptions pipelineOptions;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to extract it out. (This was an artifact of splitting the class up on the generic vs. type-specific bits.) I think this is better now, though it does give one method that has a ton of parameters...

I also pulled out FlinkInput/FlinkOutput, but we could make them inner classes again if you'd prefer.

@github-actions github-actions bot removed the website label Feb 27, 2024
@robertwb
Copy link
Contributor Author

robertwb commented Mar 4, 2024

I've addressed all your comments so I'm going to go on your LGTM and merge this.

@robertwb robertwb merged commit 9c3f209 into apache:master Mar 4, 2024
22 checks passed
@kennknowles
Copy link
Member

There are still lots of nullness checks suppressed that presumably should be fixed, not suppressed. I didn't see any systemic reason to disable checking.

@robertwb
Copy link
Contributor Author

robertwb commented Mar 4, 2024

Most of these are because Map.get() uses nulls rather than exceptions to indicate keys aren't there (that tautologically should be). Cleaned up at #30488

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants