-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Difficult to use Portable Flink Runner with Go #26485
Comments
I totally agree that we need better documentation of portable runners in general. I've been digging through python and go SDK code for days trying to get portable runners to work and OMG the official documentation was bad. It seems like you misunderstood LOOPBACK environment. Loopback starts a server on your host machine (not the flink node) and is solely for debugging locally. The beam generated code running on flink will connect to your go code on your machine to run UDFs, so it's bounded by local memory. As for DOCKER environment, the beam code will try to run the apache/beam_go_SDK image on flink node so you will need to be able to run docker on that machine. To run docker in docker containers, you need to set Another important thing to note, you need to mount the job server artifact directory to SDK harness containers as they expect to find artifacts in the same directory. I'm not a beam expert though, please take these with a grain of salt. |
Here's a tip for anyone who stumbles upon similar issues with spark portable runner. The official document says you should use |
@LibofRelax Do you have any sample configs or basic setup instructions beyond what was said before that would be useful? In particular how are you running the runner image and what version? Also I cannot find |
@jeremyje It's been a while since I gave up on using Beam in production, so this sample might not be fully correct. I also only used spark, maybe flink runner doesn't have the problems I had. Here's a sample docker compose config that kinda worked for me. It runs smoothly until UDF execution. There seems to be a version mismatch between the job server and the worker implementation despite they have the same version tag. The worker will expect a log endpoint from the job on spark worker which the spark worker does not seem to expose. You can try it out with I suggest you try the docker-in-docker environment option. Maybe the default docker image is consistent with the job implementation. I already set the privileged flag to true in the compose file for that. If using version: '3'
volumes:
tmp:
services:
spark:
image: docker.io/bitnami/spark:3.1.2
environment:
- SPARK_MODE=master
ports:
- "8080:8080"
spark-worker:
image: docker.io/bitnami/spark:3.1.2
privileged: true # To run docker SDK harness
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=4g
- SPARK_WORKER_CORES=1
- BEAM_WORKER_POOL_IN_DOCKER_VM=1
- DOCKER_MAC_CONTAINER=1
ports:
- "8081:8081"
- "8100-8200:8100-8200"
volumes:
- tmp:/tmp
- ./work/spark:/opt/bitnami/spark/work
beam-python-workers:
image: apache/beam_python3.10_sdk:2.49.0
command: [ "--worker_pool" ]
environment:
- RUN_PYTHON_SDK_IN_DEFAULT_ENVIRONMENT=1
volumes:
- tmp:/tmp
beam-job-server:
image: apache/beam_spark3_job_server:2.49.0
command: [ "--spark-master-url=spark://spark:7077" ]
ports:
- "4040:4040" # Spark job UI on the driver
- "8099:8099" # Job endpoint
- "8098:8098" # Artifact endpoint
volumes:
- tmp:/tmp
depends_on:
- spark
- spark-worker |
@LibofRelax thanks a lot for this answer. I spent 2 weeks trying distinct combinations of Spark and Beam versions without success. With your tip, it worked! |
Has there been any work on this? I'm struggling to understand how to make the Beam work through the Flink task manager on Kubernetes. Obviously, with Kubernetes, With Python, it looks like you run There are zero examples on doing this with Go. Even after several years of apparent Beam + Go support, nobody seems to have posted any code examples, and there's just a tiny handful of StackOverflow posts that don't have any solutions.
It this a prerequisite, then? Are the plans to work on it? |
What happened?
Request to update the instructions on how to run Apache Beam portable runner on Flink and consider Go. Also test that the instructions work on newer versions of Flink (1.17 or 1.16) with updated runner docker images.
I followed instructions on running Apache Flink with Apache Beam 2.46.0 Go SDK. I'm hitting all sorts of issues and have not gotten my pipelines to work based on the instructions given at:
At best, I am able to bring up a Flink instance with a pipeline that processes a limited amount of data ~200 MB and then seizes without any feedback. This pipeline runs in
environment_type=LOOPBACK
mode. I'm starting to think there's some weird compatibility issue as I'd really prefer to run in the default-environment_type=DOCKER
mode. It's not clear if this mode supported given the example goes up to Flink 1.14 which is no longer listed on the Flink site.It'd be nice to get updated documentation for running Beam on a Flink cluster and have it target a newer version, ~Flink 1.17.
These are the steps I've taken:
Run the Pipeline with the following parameters:
go run mypipeline.go -runner flink -endpoint=coder:8099 -environment_type=LOOPBACK root_transform_ids: "s1" 2023/04/29 07:40:03 Cross-compiling /home/coder/project/mypipeline.go as /tmp/worker-1-1682754003926484499 2023/04/29 07:40:05 Prepared job with id: go-job-1-1682754003926481819_79b82cf7-1e81-4c5a-a49d-253712a5c422 and staging token: go-job-1-1682754003926481819_79b82cf7-1e81-4c5a-a49d-253712a5c422 2023/04/29 07:40:05 Staged binary artifact with token: 2023/04/29 07:40:05 Submitted job: go0job0101682754003926481819-root-0429074005-d5669737_2f0246ce-150e-4e32-a234-e2f03f9c9222 2023/04/29 07:40:05 Job state: STOPPED 2023/04/29 07:40:05 Job state: STARTING 2023/04/29 07:40:05 Job state: RUNNING 2023/04/29 07:40:16 starting worker 1-1
Only
-environment_type=LOOPBACK
has worked for me to submit jobs but it still fails after processing about 100MiB of data which the memory limits are well above that (see theflink-conf.yaml
below showstaskmanager.memory.process.size: 20g
)With trying the various ways to run
job-server
the most I get is a successful job submit inI've attempted other ways to workaround the various errors I've been seeing such as no docker available in the
apache/beam_flink1.15_job_server:2.46.0
container.Versions
Attempt with
docker-compose
on the job server:The docker image for
jeremyje/beam_flink1.15_job_server:2.46.0
image when trying to get docker mode to work.My
conf/flink-config.yaml
which mainly exposes all the ports outside of the machine so I can submit jobs from another machine.Logs from the jobserver.
Issue Priority
Priority: 3 (minor)
Issue Components
The text was updated successfully, but these errors were encountered: