Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow local runners to execute arbitrary cross language pipelines without Docker. #29283

Merged
merged 7 commits into from
Dec 1, 2023

Conversation

robertwb
Copy link
Contributor

@robertwb robertwb commented Nov 3, 2023

This is accomplished by re-using the expansion service subprocesses as the workers themselves.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

github-actions bot commented Nov 3, 2023

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@robertwb robertwb force-pushed the expansion-loopback branch 8 times, most recently from f8477a7 to 93af8ac Compare November 8, 2023 20:25
Copy link
Contributor

github-actions bot commented Nov 8, 2023

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @tvalentyn for label python.
R: @m-trieu for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

.build();
}

public static List<Environment> expandAnyOfEnvironments(Environment environment) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an ImmutableList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally try to avoid exposing (our vendered) guava in our public API. Given that this is created on the fly, there's little benefit to making it immutable.

@@ -287,26 +283,50 @@ public void run(RunJobRequest request, StreamObserver<RunJobResponse> responseOb
}
}

private Map<String, List<RunnerApi.ArtifactInformation>> extractDependencies(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this return/be an ImmutableMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. (I suppose here the method is private, but it's stell IMHO preferable to not declare implementation details in the signature.)

}
return dependencies;
}

private RunnerApi.Pipeline resolveDependencies(RunnerApi.Pipeline pipeline, String stagingToken) {
Map<String, List<RunnerApi.ArtifactInformation>> resolvedDependencies =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of allowing null here can we default to an empty map if stagingService.getService().getStagedArtifacts(stagingToken) returns null?

or wrap in an optional

can mark as @Nullable if the other 2 are not feasible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null seems to indicate that no staging was attempted here (which is technically fine for something that has no artifacts, but we can't assume that if there's no entry). This is different from "the dependencies resolved to the empty set" which would be an error to just accept.

Marking it as @nullable is painful as the null checker isn't smart enough to figure out that it'll never be null in the second clause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack thanks!

@robertwb robertwb mentioned this pull request Nov 9, 2023
3 tasks
@tvalentyn
Copy link
Contributor

@robertwb is this still a WIP or ready to review?

@robertwb
Copy link
Contributor Author

Yes, this is ready for review. (I was trying to get all the presubmits to pass, but I think this is the CI go issues that have been plaguing us elsewhere.)

@lostluck
Copy link
Contributor

I can confirm I'm chatting down flakes in prism.

Copy link
Contributor

Reminder, please take a look at this pr: @tvalentyn @m-trieu

This allows an SDK (including, importantly,  an expansion service)
to provide several alternatives environments suitable for running
a pipeline.
@robertwb
Copy link
Contributor Author

I rebased and resolved the conflicts with master. Please take a look.

pipeline_proto.components.environments[docker_env_id].CopyFrom(
embedded_env)
for docker_env_id in python_docker_environments:
pipeline_proto.components.environments[docker_env_id].CopyFrom(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my education, what is the meaning of this? We initializing missing attributes (env capabilities, etc) on the docker environment using default embedded environment as template?

Also, could you please add a docstring to embed_default_docker_image ? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed this as I realize it wasn't very clear. You can't assign to a proto map in Python, you have to use CopyFrom.

Also, docstring added.

if key not in self._cache:
self._cache[key] = (self._constructor(*key), set())
for owner in self._live_owners:
self._cache[key][-1].add(owner)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we store NamedTuples instead of tuples so that self._cache[key][-1] becomes self._cache[key].owners ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

for _ in range(5):
if process.poll() is not None:
break
logging.debug("Sending SIGINT to job_server")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the process always a job_server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Updated.

Copy link
Contributor

@m-trieu m-trieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for java

Copy link
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Also, thanks for structuring commits in a way that made the change easier to review.

@tvalentyn
Copy link
Contributor

PTAL at lint. ERROR: /runner/_work/beam/beam/sdks/python/test-suites/tox/py38/build/srcs/sdks/python/apache_beam/utils/subprocess_server.py Imports are incorrectly sorted.

@robertwb
Copy link
Contributor Author

Thanks! Yeah, I'll fix that lint error before merging.

…peline.

This can greatly reduce startup time when many cross-langauge transforms
are used, but more importantly by keeping these processes alive we open up
the potential for using them as workers as well.

These can be cached across longer durations as well, but this is the default.
Due to the AnyOf environment, remote runners can choose more
expensive but remote-friendly options such as docker.
@robertwb robertwb merged commit f337c74 into apache:master Dec 1, 2023
97 checks passed
@Abacn
Copy link
Contributor

Abacn commented Mar 6, 2024

It appears Python XVR Direct is failing after this change. Specifically #28972 (comment) any ideas?

Last successful run (before it was flaky passing 1-2 jobs): https://github.com/apache/beam/actions/runs/7059453030
First failed run (after that both jobs always failing): https://github.com/apache/beam/actions/runs/7063238797

and this is the only relavant change:

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants