Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: prism is not working with anonymous function #28981

Closed
1 of 16 tasks
jhw0604 opened this issue Oct 13, 2023 · 9 comments · Fixed by #29179
Closed
1 of 16 tasks

[Bug]: prism is not working with anonymous function #28981

jhw0604 opened this issue Oct 13, 2023 · 9 comments · Fixed by #29179
Assignees

Comments

@jhw0604
Copy link

jhw0604 commented Oct 13, 2023

What happened?

prism is not working with anonymous function

example minimal_wordcount is not working

An anonymous function cannot be use regist with beam.RegisterDoFn and beam fails decoding.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johannaojeling
Copy link
Contributor

Hi @jhw0604, which version are you using? The example should work with the latest, v2.51.0.

@lostluck
Copy link
Contributor

They're right that it doesn't work (at least, synced to head).

Prism (and most portable runners) don't work with anonymous functions.

It is concerning that it's simply hanging instead, and not properly failing though. That's a separate issue.

I thought I had changed all of these to not be anonymous... :/

@lostluck
Copy link
Contributor

Well, PR #29179 is up to fix the example.

Prism does seem to still have a gap somewhere for unregistered functions, preventing a failed pipeline from terminating. Filed #29180 to track that.

@johannaojeling
Copy link
Contributor

I don't get any errors when using anonymous functions with Prism 🤔

Other example: if modifying the textio unit test TestReadWithFilename to the below, the test still passes:

got := beam.ParDo(s, toKV, lines)

-	got := beam.ParDo(s, toKV, lines)
+	got := beam.ParDo(s, func(k, v string) kv {
+		return kv{k, v}
+	}, lines)

I saw your note in the PR @lostluck about anonymous functions not working on any portable runner. I remember I wrote a doc example for fileio.ReadMatches that uses one. That example runs on Prism and Dataflow, but I should probably change the code if we do not want to encourage users to use anonymous functions?

Interesting - but seemingly unrelated to anonymous functions(?) - if I run a pipeline with textio.Read from a remote location such as GCS, sometimes it completes successfully immediately and sometimes it shows unexpected behavior when reading, which comes across as the pipeline being stuck. It makes a single initial split of the restriction but then does a lot of splitting while processing. When reading from a local file with lower latency this doesn't seem to happen. Hm.. I've encountered some strange splitting behavior when working on the natsio.Read unbounded source transform but thought it was some deficiency in my SDF code and haven't had the chance to debug it yet. I wonder if it may be related.

@lostluck
Copy link
Contributor

@johannaojeling

It makes a single initial split of the restriction but then does a lot of splitting while processing.

That's probably from prism being unoptimized for splits. Turns out good splitting heuristics are hard!

Right now, basically just an SDF will cause splitting, since prism only splits if the channel progress indication hasn't moved since the last progress request (every ~100ms)

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L150

Splitting to a stand still is definitely a bug. In principle it should estimate the amount of work remaining (by using the sizes and similar), and only split if there's a "reasonable" amount of work to split across.


On the function execution portions...

That's very odd. I can't think of why it would be the case though. Worth investigating and resolving, since the SDK failure to look up the function shouldn't cause a pipeline to hang.


For sure, closures simply cannot work, since the external to the function variables they refer to aren't going to be initialized. If I had the time and energy, I'd figure out how to improve Go "reflect" to get at closured data in anonymous functions, and be able to rehydrate them. But it's such a niche concern, it's uncertain how generally useful it would be. It just would have to be in reflect.

Anonymous functions can work unregistered, but only because of a very sneaky, janky trick we do deep in the bowels of the system, where we load up the symbol from the DWARF debug data in the binary, and scan them looking for names we've skimmed from before:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/util/symtab/symtab.go

In combination with this: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/util/reflectx/functions.go#L25

To get the DWARF table, one needs to do a scan of the binary, which can be arbitrarily large. (Usually not bad for most Go, but if the binary has piles of CGO in there... it's big).

As with any unsafe technique, they don't work consistently, are very likely to break as the internals of Go shift around (as with anything unsafe) etc. eg. Tests basically require registration, and was the original use case.

Ultimately, because it only works sometimes it's a frustrating experience to have it shown off, only to not work. Hence my bias to recommending what does work, along with the most efficient way to do it.

@johannaojeling
Copy link
Contributor

Ah I see. Thanks for explaining!

Ultimately, because it only works sometimes it's a frustrating experience to have it shown off, only to not work. Hence my bias to recommending what does work, along with the most efficient way to do it.

Agree with this.

@jhw0604
Copy link
Author

jhw0604 commented Oct 31, 2023

@lostluck
I think "anon doFn" is an important.

It make more simple code and easy-to-read code flow.

Above all, the official website mentions its features.
formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted)

May be able to blind it by removing it from the example, but
I think this is a feature that must be added.

Rather than deleting the example, it seems better to leave Prism's runtime error message saying that "anon doFn" is not yet supported.

@lostluck
Copy link
Contributor

Ah, good find. That page definitely needs to be updated too. Thank you!

Technically it's not Prism returning the error, it's the SDK. As a portable runner, Prism doesn't know very much about the SDK it's running. The problems also apply to Flink, Dataflow, Spark etc.

The go direct runner, which isn't supportable and by definition, isn't scalable, can't properly ensure your pipeline is correct, which is why it's deprecated.

But as stated, it's not possible to reliably support it in the SDK as it is. We'd need a different Go SDK to manage it.


I will say that if it could reliably be supported, it would be my first choice, but I've not seen any good mechanism for it, that won't be an ongoing maintenance headache. Manually registering works every time is necessary for unit testing anyway, and doesn't encourage closures which cause more problems, because we can't serialize the closure.

But until we can reliably support anons, it's better to not make the most brittle part of the SDK be front and center in the examples.

@jhw0604
Copy link
Author

jhw0604 commented Oct 31, 2023

I think I understand why you think that.
Need to find a way to make the sdk support anons, not modify the prism.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants