Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Portable runners should be able to issue checkpoints to Splittable DoFn #20979

Open
damccorm opened this issue Jun 4, 2022 · 9 comments
Open

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

To execute unbounded Splittable DoFn over fnapi in streaming mode properly, portable runners should issue split(ProcessBundleSplitRequest with fraction_of_remainder > 0) or simply checkpoint(ProcessBundleSplitRequest with fraction_of_remainder == 0) to SDK regularly to make current bundle finished processing instead of running forever.

Imported from Jira BEAM-11998. Original Jira may contain additional context.
Reported by: boyuanz.

@LemonU
Copy link

LemonU commented Aug 18, 2022

I have managed to pull off the workaround of adding --experiments=use_deprecated_read mentioned in the original Jira ticket.

First, you'll need to start the Java expansion service on your own. If you are deploying the expansion service on docker, you can simply pull the flink job service image (apache/beam_flink1.14_job_server:2.40.0 is what I used) and override the entrypoint with the following commands

java -cp /opt/apache/beam/jars/* org.apache.beam.sdk.expansion.service.ExpansionService 8097  \
--javaClassLookupAllowlistFile="*" \
--defaultEnvironmentType=<your environment type here> \
--defaultEnvironmentConfig=<your environment config here> \
--experiments=use_deprecated_read

Explanation on each of the flags:

-cp /opt/apache/beam/jars/*: this is where the expansion service jars is located in the container

8097: this specifies the port the expansion service should be opened on

--javaClassLookupAllowlistFile="*": this is so that all transforms registered under the expansion service can be requested for external expansion

--defaultEnvironmentType=<your environment type here> and --defaultEnvironmentConfig=<your environment config here>: this specifies the Environment that the Java transforms you requested from this expansion service should be executed in. Be advised, your pipeline's environment configs will not affect this value, and the values set here for the expansion service will override that of your pipeline's.
That is, let's say you are running a python pipeline with --environment_type=EXTERNAL --environment_config=localhost:50000 and the expansion service is started with --defaultEnvironmentType=DOCKER, and you are requesting the expansion for the kafka IO transforms from the expansion service, the resulting pipeline Protobuf payload will have all stages' environment being set to the EXTERNAL environment but the Kafka IO transforms that you requested from the expansion service, which will be set to the DOCKER environment.

--experiments=use_deprecated_read: this is so that the legacy Read transform will replace the new SDF-based Kafka Read transform when the expansion service is expanding the kafka IO stage.

@kennknowles
Copy link
Member

Since this is new functionality I think that P2 is the right level. This is still an important priority for portable runners to function properly with SDF.

@kennknowles
Copy link
Member

CC @chamikaramj since tagged with xlang

@Abacn
Copy link
Contributor

Abacn commented Feb 3, 2023

This is coming back as we are pushing forward for the SDF implementations for various sources (kafka, generate sequence, etc) as well as en route to runner v2 of Dataflow runner. I have read the context of BEAM-11998 and would like to work on this.

@Abacn
Copy link
Contributor

Abacn commented Mar 2, 2023

Another problem related to this issue is that running PeriodicSequence on Flink runner, the pipeline first runs for ~1 minute but then will fail with error

Traceback (most recent call last):
  File ".../periodictest.py", line 82, in <module>
    test0(p)
  File ".../py38beam/lib/python3.8/site-packages/apache_beam/pipeline.py", line 601, in __exit__
    self.result.wait_until_finish()
  File "...py38beam/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py", line 614, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline ...-6ce3c5fde435 failed in state FAILED: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 100.81.162.100:53640-99d457 timed out.

code is pretty simple:

with beam.Pipeline(options=pipeline_options) as p:
    result = (p
  | PeriodicImpulse(fire_interval=1.0)
  | beam.Reshuffle()
  | beam.Map(print))

@portikCoder
Copy link

Guys, maybe it's a complete stupid question, but AFAIUnderstand it the workaround works only in case there is single worker/machine doing the job, but what in case e.g. Dataproc or where the load is theoretically spread and u cannot control everywhere how the expansion service is spun up?

@kennknowles
Copy link
Member

The expansion happens before the job is launched. It isn't per-worker.

@portikCoder
Copy link

portikCoder commented May 23, 2024

Thanks!
BTW, I figured if I run the ABeam template like

python -m beam_streamer \
  --runner=FlinkRunner \
  --flink_master=${FLINK_MASTER_URL} \
  --flink_submit_uber_jar \
  --environment_type=DOCKER \
  --experiments=use_deprecated_read <======|

then it passes the switch downwards correctly:

/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135 [] - Pipeline_options: {'streaming': True, 'project': ..., 'experiments': ['use_deprecated_read', 'beam_fn_api'], ...

Well, scratch the above... I re-invented the wheel by figuring it's not enough and u definitely need that extra service running manually, externally. 🤷

@hemavenkatarangan-2
Copy link

hi, is this issue resolved. Im also facing same issue as of now.Need a fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants