Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28187][prism] Basic cross language support. #28545

Merged
merged 2 commits into from
Sep 20, 2023

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Sep 19, 2023

This PR allows prism to pass the 7 of the 8 tests in the Basic Xlang tests. This remains experimental for prism.

Fixes a bug in Go SDK iterators where io.EOF was getting incorrectly wrapped, preventing downstream behavior on iterator read calls. Triggered by the Java CoGBK transform results.

TestXLang_CoGroupBy is the only one that doesn't pass at present, but passing CoGBKResult output over cross language boundaries is uncommon. #28544 filed to resolve.

  • Allow startup and shutdown of multiple workers.
    • Worker stop is cleaned up to occur on post job context cancellation, so Serving has moved to each environment.
    • Open question: Maybe only start the worker for an environment if it executes something? GBK and Flatten are done runner side, and don't need their SDK environments...
  • Plumb through the "correct" worker for stages.
  • Allow prism to "support" Java CombineGlobally, and CombineByValues urns, by relying on the SDK backup implementation.
    • Similarly "support" the FireAlways trigger, by not failing pipelines that use it. However Prism doesn't yet actually support trigger behavior, but since only a single "ontime" fireing is implemented, current behavior is compatible with those triggers.
  • Stage enumeration is now based on the Topological sort/from fusion output.
  • Minor error/shutdown logging improvements.

See #28187


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@codecov
Copy link

codecov bot commented Sep 19, 2023

Codecov Report

Merging #28545 (f321ab0) into master (3024ec2) will decrease coverage by 0.01%.
Report is 30 commits behind head on master.
The diff coverage is 50.00%.

@@            Coverage Diff             @@
##           master   #28545      +/-   ##
==========================================
- Coverage   72.22%   72.22%   -0.01%     
==========================================
  Files         684      684              
  Lines      100887   100901      +14     
==========================================
+ Hits        72870    72879       +9     
- Misses      26437    26448      +11     
+ Partials     1580     1574       -6     
Flag Coverage Δ
go 53.42% <50.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go 82.41% <0.00%> (-1.70%) ⬇️
...m/runners/prism/internal/jobservices/management.go 54.89% <0.00%> (-0.48%) ⬇️
...ks/go/pkg/beam/runners/prism/internal/urns/urns.go 100.00% <ø> (ø)
...o/pkg/beam/runners/prism/internal/worker/bundle.go 43.33% <0.00%> (ø)
...o/pkg/beam/runners/prism/internal/worker/worker.go 43.56% <14.28%> (-1.32%) ⬇️
...go/pkg/beam/runners/prism/internal/environments.go 29.37% <50.00%> (+0.29%) ⬆️
sdks/go/pkg/beam/runners/prism/internal/execute.go 87.23% <77.77%> (+1.88%) ⬆️
sdks/go/pkg/beam/runners/prism/internal/stage.go 82.97% <100.00%> (ø)

... and 3 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@jrmccluskey jrmccluskey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just some clarification questions

@@ -95,7 +92,7 @@ func makeWorker(env string, j *jobservices.Job) (*worker.W, error) {
// Check for connection succeeding after we've created the environment successfully.
timeout := 1 * time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
if wk.Connected() || wk.Stopped() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where Stopped() could be indicative of an error in worker start-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The stopped bit is only set when Stop is called, which only happens when a context has cancelled, which only happens on post Job clean up.

Other errors would be reported earlier (ideally).

This approach largely prevents this ("worker didn't connect") function from failing successful jobs after the job completed, if the job never needed the environment at all. I intend to change worker environments to start on demand, and move worker startup to after preprocessing. As it stands, a minute+ job using one of these will just die. Won't happen for real xlang transforms though.

@@ -67,7 +67,7 @@ type W struct {
server *grpc.Server

// These are the ID sources
inst, bund uint64
inst uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now being pulled from the ProcessBundle instruction abstraction directly, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the bundle descriptor IDs.

The shorter lived RPCs like Progress and Split still use the Instruction IDs from the worker side, rather than the "global" one in execute used by the ElementManager.

I haven't yet nailed down the best way to have "global" state accessed by the worker abstraction, vs dedicated state (like environment protos). It's getting there though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants