Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely #27648

Open
3 of 15 tasks
nybbles opened this issue Jul 24, 2023 · 17 comments

Comments

@nybbles
Copy link

nybbles commented Jul 24, 2023

What happened?

Please see https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405 for code to replicate this problem.

Problem description

I am finding that using unbounded SDFs with Flink results in checkpoint sizes that grow without bound, which eventually results in the job failing and all further job submissions to fail, even for beam.transforms.periodicsequence.PeriodicImpulse in a very simple pipeline, given below.

My pipeline consists of an SDF that reads from an unbounded source, which means that when there are no new messages, the SDF must poll the unbounded source, with some timeout. I observed that when my SDF would do this polling behavior (using tracker.defer_remainder as described in https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint, the checkpoint size would grow.

This happens even if the unbounded source was empty, and hence my SDF simply executed a loop of polling the unbounded source and then calling tracker.defer_remainder and returning from the DoFn to relinquish control and wait to poll again.

I was concerned that I had implemented my SDF or my pipeline incorrectly, so I found beam.transforms.periodicsequence.PeriodicImpulse and tested it in a very simple pipeline, which is as follows (note that apply_windowing's value does not change the problematic behavior):

    with beam.Pipeline(options=runner_options) as pipeline:
        pcoll = pipeline | PeriodicImpulse(
            fire_interval=5, apply_windowing=True
        )
        pcoll | beam.Map(print)

This pipeline also results in growing checkpoint size.

The Flink cluster configuration and full source for the program to replicate the problem and the Docker compose to get the Flink cluster up and running are given below in the reproduction steps and in https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405.

In case it is helpful, I'll list the FLINK_PROPERTIES and PipelineOptions below.

      FLINK_PROPERTIES: &flink_properties |-
        historyserver.archive.clean-expired-jobs: true
        state.backend: hashmap
        state.checkpoints.dir: file:///tmp/beam_state/
        state.checkpoints.num-retained: 10
        jobmanager.rpc.address: host.docker.internal
        rest.address: host.docker.internal
        taskmanager.numberOfTaskSlots: 4
        taskmanager.memory.process.size: 2048m
    runner_options = PortableOptions(
        artifact_endpoint=f"{JOB_SERVER_HOSTNAME}:8098",
        environment_cache_millis=0,
        environment_config="apache/beam_python3.11_sdk:2.48.0",
        environment_options=None,
        environment_type="DOCKER",
        job_endpoint=f"{JOB_SERVER_HOSTNAME}:8099",
        job_server_timeout=60,
        runner = "PortableRunner",
        output_executable_path=None,
        sdk_worker_parallelism=0,
        state_backend = "filesystem",
        state_backend_storage_path = "file:///tmp/beam_state/",
        streaming = True,
        checkpointing_interval = STREAMING_CHECKPOINTING_INTERVAL,
        parallelism = 1,
        auto_watermark_interval = 500,
    )

See this email thread for more context: https://lists.apache.org/thread/7yjr1f24rdzwzofdty1h12w9m28o62sm.

Note on priority

I followed the linked guide for setting issue priorities and set this one to priority 1 because it seems like unbounded SDFs is an important component, running on Flink is an important usecase, and having arbitrary checkpoint size growth makes unbounded SDFs on Flink non-functional. My apologies in advance if this is the wrong priority level.

Reproduction steps

  1. Run a Flink cluster (i.e. the Gist above provides my Docker Compose-based Flink cluster. It uses a taskmanager image that can run Docker containers, needed for using environment_type="DOCKER").
  2. Run the attached driver program in the Gist above to create the job and submit it to the Flink cluster.
  3. Observe in the logs that the checkpoints grow in size, specifically the step with the SDF, (and in proportion to the number of calls to tracker.defer_remainder), despite the SDF not actually explicitly accumulating any state.
  4. The checkpoint size grows until the Java heap space is exhausted and the job is killed. Now when the job is resubmitted, it will always fail, because the job manager attempts to restore the job from the large checkpoint, resulting in Java heap space being exhausted again.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Jul 25, 2023

Thanks, there are long-standing issues for Flink checkpoint support on Python SDK: #21318 #20979

if interested open a fix would really appreciate.

@nybbles
Copy link
Author

nybbles commented Jul 25, 2023

@Abacn thanks for sharing the other issues for Flink SDF checkpoint support on Python SDK. I'm not sure what you're asking me though.. Are you asking me to create a fix for this issue?

I can try to do that, but I'll need some guidance, as I am not too familiar with the either Beam or Flink codebases.

@Abacn
Copy link
Contributor

Abacn commented Jul 27, 2023

Thanks @nybbles the findings in the issue description is already very specific and clearer than what I know before in terms of the root cause. Happy to help from the beam side

@Abacn
Copy link
Contributor

Abacn commented Jul 28, 2023

fyi Java Portable Runner has the same issue (submit a Java PeriodicImpulse using PortableRunner via FlinkJobService). Python flink runner is just a wrapper to use the job service jar. So the fix should be in Java portable runner code side.

EDIT: Java PeriodicImpulse using PortableRunner actually works. The test forgot to add --streaming flag.

added --streaming flag does not change. Still fail after some seconds

@nybbles
Copy link
Author

nybbles commented Jul 28, 2023

Awesome thank you that's a helpful pointer.

Do you mean that the cause is likely to be somewhere here? https://github.com/apache/beam/tree/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability

I.e. the cause is before the pipeline is translated into protobuf and sent to https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L37?

@Abacn
Copy link
Contributor

Abacn commented Jul 31, 2023

I am not very familiar with, one thing could note is that the splittable DoFn support is missing for "portable" Flink runner job, which is the case of Python SDK jobs.

Maybe one start point is here. Will dig into.

@Abacn
Copy link
Contributor

Abacn commented Jul 31, 2023

caused by #19637

@nybbles
Copy link
Author

nybbles commented Jul 31, 2023

That's really concerning that splittable DoFn support is missing for the portable Flink runner job, as this Beam/Flink capability matrix implies the opposite: https://beam.apache.org/documentation/runners/capability-matrix/unbounded-splittable-dofn-support-status/.

@Abacn
Copy link
Contributor

Abacn commented Aug 1, 2023

I am testing Java portable runner and is making a little bit progress

if (ExperimentalOptions.hasExperiment(
        pipeline.getOptions(), "beam_fn_api_use_deprecated_read")
        || ExperimentalOptions.hasExperiment(pipeline.getOptions(), "use_deprecated_read")) {
      SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
    }

in PortableRunner.run(), one is able to get GenerateSequence working for portable Flink runner. Same should happen for KafkaIO

@nybbles would you mind testing your kafka pipeline with option --experiments=use_deprecated_read ? (PeriodicImpulse does not work because its a pure SDF; while KafkaIO still has an UnboundedRead implementation)

@nybbles
Copy link
Author

nybbles commented Aug 2, 2023

@Abacn thanks for continuing to dig into this!

I'm not actually using KafkaIO. I wrote an SDF to read from a Redis stream. Can I simply pass the option you mentioned in to test, or is there anything rust I should do for my Redis stream SDF?

I wrote this Redis stream SDF using the Python SDK.

@nybbles
Copy link
Author

nybbles commented Aug 2, 2023

Here's roughly what the code for my
Redis stream SDF looks like: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've since done some simplifications and fixed some mistakes, but was still seeing checkpoint size growth.

@Abacn
Copy link
Contributor

Abacn commented Aug 2, 2023

Hi @nybbles, I see, for pure SDF this option (use_deprecated_read) then won't work. This option was for the deprecated UnboundedRead (only exist in Java SDK)

Nevertheless given that UnboundedRead execution is not broken on portable flink streaming, one should be able to fix translateExecutableStage (##27648 (comment)) according to it to fix at least the basic functionality of unbounded SDF

@AyWa
Copy link
Contributor

AyWa commented Aug 20, 2023

@nybbles would you mind testing your kafka pipeline with option --experiments=use_deprecated_read ? (PeriodicImpulse does not work because its a pure SDF; while KafkaIO still has an UnboundedRead implementation)

I can confirm that it is working properly ! The way to make it works is to set some expansion service params like:

import apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service

ReadFromKafka(
  consumer_config=...
  expansion_service=default_io_expansion_service(append_args=['--experiments=use_deprecated_read'])
)

However I have still some problem to make it works in the flink k8s operator, because of beam artifacts resolution. But I tried to make a fix See #28068

@nybbles
Copy link
Author

nybbles commented Oct 15, 2023

@nybbles would you mind testing your kafka pipeline with option --experiments=use_deprecated_read ? (PeriodicImpulse does not work because its a pure SDF; while KafkaIO still has an UnboundedRead implementation)

I can confirm that it is working properly ! The way to make it works is to set some expansion service params like:

import apache_beam.io.kafka import ReadFromKafka, default_io_expansion_service

ReadFromKafka(
  consumer_config=...
  expansion_service=default_io_expansion_service(append_args=['--experiments=use_deprecated_read'])
)

However I have still some problem to make it works in the flink k8s operator, because of beam artifacts resolution. But I tried to make a fix See #28068

I'm not using KafkaIO, but rather am running a pure SDF implemented using the Python SDK.

@nybbles
Copy link
Author

nybbles commented Oct 15, 2023

Hi @nybbles, I see, for pure SDF this option (use_deprecated_read) then won't work. This option was for the deprecated UnboundedRead (only exist in Java SDK)

Nevertheless given that UnboundedRead execution is not broken on portable flink streaming, one should be able to fix translateExecutableStage (##27648 (comment)) according to it to fix at least the basic functionality of unbounded SDF

@Abacn sorry for the silence; I needed to meet a deadline so I ended up finding a temporary solution (calling defer_remainder as little as possible, to delay the issue I was seeing). However, this workaround is not really sufficient anymore, so I need to look into this issue again.

I also turned off checkpointing and am still seeing this issue (now it is Java heap space growth, proportional to the number of calls to defer_remainder that I do).

I am taking a look at translateExecutableStage, but I'm not really sure what to fix to allow for basic functionality of unbounded SDF (or at least to stop this memory leak from occurring). I would appreciate some more guidance here - where could this memory leak be coming from, how I could track it down, and/or what is meant by fixing translateExecutableStage?

@nybbles
Copy link
Author

nybbles commented Oct 15, 2023

I took a heap dump from running my pipeline that uses a pure SDF to read from Redis (should have similar memory leak to the minimal example I posted that uses PeriodicImpulse.

The retained memory is primarily in registeredKVStates, a java.util.HashMap that is in the keyedStateStore within the runtimeContext in ExecutableStageDoFnOperator.

Screenshot from 2023-10-15 15-53-56

@nybbles
Copy link
Author

nybbles commented Oct 16, 2023

@Abacn I'm having trouble finding UnboundedRead in the Java SDK. Is this what you were referring to? https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/Read.Unbounded.html.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants