Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Python SDK gets stuck when using Unbounded PCollection in streaming mode on GroupByKey after ReadFromKafka on DirectRunner, FlinkRunner and DataflowRunner #22809

Closed
alexmreis opened this issue Aug 22, 2022 · 31 comments

Comments

@alexmreis
Copy link

alexmreis commented Aug 22, 2022

What happened?

Consider the trivial example pipeline below:

"""
Reproduce the KafkaIO + Unbounded source + streaming mode bug.
"""

import logging
import os

import apache_beam as beam
from apache_beam.io.external import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window

logging.getLogger().setLevel(logging.DEBUG)

def kafka_consumer_config():
    """
    Returns config for the KafkaIO source.
    """
    return {
            "bootstrap.servers": os.getenv("KAFKA_BROKER_URL"),
            "auto.offset.reset": "latest",
            "security.protocol": "SASL_SSL",
            "sasl.mechanism": "PLAIN",
            "group.id": os.getenv("KAFKA_GROUP_ID"),
            "enable.auto.commit": "true",
            "sasl.jaas.config": f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{os.getenv('KAFKA_SASL_USERNAME')}\" password=\"{os.getenv('KAFKA_SASL_PASSWORD')}\";",
            }

with beam.Pipeline(options=PipelineOptions()) as pipeline:
    _ = (
        pipeline
        | "Read from kafka" >> kafka.ReadFromKafka(
            kafka_consumer_config(),
            [os.getenv("KAFKA_TOPIC")])
        | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
        | "Group by key" >> beam.GroupByKey()
        | "Print" >> beam.Map(lambda t: logging.warning("%s - %s", t[0], t[1]))
    )

When this pipeline is run at least in these 3 environments:

  • DataflowRunner (streaming mode)
  • FlinkRunner (streaming mode, locally, not on cluster, haven't tested with cluster)
  • DirectRunner (streaming mode)

All of them get stuck on the GroupByKey PTransform. The trigger is never fired apparently, though it is impossible to see it from the logging I get.

When adding max_num_records to the ReadFromKafka step, effectively transforming the source collection into a bounded collection, this works, both in batch and streaming mode, in all of the environments listed above.

Data is timestamped in Kafka using process time, although it is unclear from the documentation whether the KafkaIO adapter in Beam automatically timestamps entries in the source PCollection it generates.

I have also tried timestamping them manually using with_metadata and the msg.timestamp property returned, to no avail.

If I look at the Beam test suite, I see the ReadFromKafka PTransform is only tested without windowing and without grouping in Python. Should this maybe be added?

This impacts all python workloads running on Kafka, and it seems rather surprising that no one else has run into this yet.

Issue Priority

Priority: 3

Issue Component

Component: io-java-kafka

@alexmreis
Copy link
Author

I can also confirm the same problem does not happen when using the Java SDK, pointing to some sort of issue in the portability layer

@alexmreis
Copy link
Author

alexmreis commented Aug 26, 2022

I can also confirm the same works in DataflowRunner with GCP PubSubLite which is a Java cross-language IO adapter, so nothing in the cross-language portability layer either, could be the way KafkaIO handles timestamps?

On DirectRunner though, messages remain pending watermarks on the PubSubLiteIO component and windows never get triggered. On FlinkRunner the environment variable with the GCP credentials path is not passed through to the java SDK and it fails at that.

@johnjcasey johnjcasey added P2 and removed P3 labels Sep 19, 2022
@aromanenko-dev
Copy link
Contributor

CC: @chamikaramj

@amontoli
Copy link

amontoli commented Nov 8, 2022

I have a very similar issue, but instead of using Kafka module in Beam I am using the Kafka module in beam_nuggets, a wrapper of the Kafka Python client. With this source I have to add the timestamp by hand using beam.window.TimestampedValue.

I tried to analyze the data after applying the window transformation by using the AnalyzeElement class defined here (Example 2). Data is correctly assigned to a window, but GroupByKey never gets called.
I have used the Direct Runner and the Flink Portable one. I have also tried using a non-default trigger (trigger=trigger.AfterWatermark()).

I do not know if it is related, but I have also tried with reading from file with ReadFromText and the streaming pipeline option: data is processed line by line up to the GroupByKey step, but the latter is called only after the whole file has been read, as if the window trigger does not activate until the PCollection has ended.

@johnjcasey
Copy link
Contributor

Ok, I've dug into this quite a bit:

  1. It appears to be unrelated to underlying use of Kafka UnboundedSource v. Kafka SDF
  2. Data flows normally with no window+gbk
  3. with
    | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
    | "Group by key" >> beam.GroupByKey()
    data does not appear to flow normally, but will show up on a drain request to dataflow
  4. with
    | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5), trigger=Repeatedly(AfterProcessingTime()), accumulation_mode=AccumulationMode.ACCUMULATING)
    | "Group by key" >> beam.GroupByKey()
    Data appears to flow normally. This suggests there is some issue triggering in the default window, which may or may not be related to xlang

@chamikaramj
Copy link
Contributor

cc: @lukecwik

@johnjcasey
Copy link
Contributor

I've tried plain java with a kafka source, and it looks like SDF on runner v2 exhibits this as well, while unbounded on runner v1 works normally. I'm going to try SDF on v1 and unbounded on v2 to try and isolate.

@johnjcasey
Copy link
Contributor

for whatever reason,
ExperimentalOptions.addExperiment(options, "use_deprecated_read");
ExperimentalOptions.addExperiment(options, "use_runner_v2");

together don't work, and the kafka consumer fails to construct. 

@johnjcasey
Copy link
Contributor

and there isn't a way to try use sdf on v1, as there is an overrride

@johnjcasey
Copy link
Contributor

ExperimentalOptions.addExperiment(options, "use_unbounded_sdf_wrapper");
ExperimentalOptions.addExperiment(options, "use_runner_v2");

start up correctly, but exhibit the same behavior of failing to produce results.

this is certainly happening in the groupByKey:
image

@johnjcasey
Copy link
Contributor

Well, the above is wrong. with

ExperimentalOptions.addExperiment(options, "use_unbounded_sdf_wrapper");
ExperimentalOptions.addExperiment(options, "use_runner_v2");

I am now seeing elements coming through. This appears to be an issue with SDF source, not unified worker

@johnjcasey
Copy link
Contributor

which doesn't make much sense, as that was the original case that prevented data on python...

@johnjcasey
Copy link
Contributor

I've re-confirmed the base python case does not work. So:

Python xlang with unbounded underlyer: does not work
Python xlang with sdf underlyer: does not work
Java with unbounded (runner v1): works
Java with sdf (runner v2): does not work
Java with unbounded (runner v2): works

I don't really know what is causing the issues

@johnjcasey
Copy link
Contributor

Ok, it looks like python with unbounded actually is working?

def log_ride(ride):
    logging.error("got a message %r", ride)

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  import argparse
  parser = argparse.ArgumentParser()

  known_args, pipeline_args = parser.parse_known_args()
  pipeline_options = PipelineOptions(
      pipeline_args, save_main_session=True, streaming=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
      (pipeline
      | ReadFromKafka(
          consumer_config={'bootstrap.servers': 'kafkaio-johnjcasey-load-test-m:9092'}, # host.docker.internal:9092
          topics=["quickstart-events"],
          with_metadata=False)
       | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
       | "Group by key" >> beam.GroupByKey()
       | beam.FlatMap(lambda ride: log_ride(ride)))

with configs:

--runner=DataflowRunner
--region=us-central1
--project=google.com:clouddfe
--temp_location:gs://jjc-sandbox/temp
--staging_location=gs://jjc-sandbox/temp
--defaultSdkHarnessLogLevel=ERROR
--sdk_harness_container_image_overrides
".*java.*,gcr.io/google.com/clouddfe/johnjcasey/beam_java8_sdk:latest"

and I see
image

@johnjcasey
Copy link
Contributor

so this is certainly an SDF error.

@johnjcasey
Copy link
Contributor

Ok, after some experimentation, this appears to be caused when data is 'sparse' on kafka.

We only advance the SDF watermark when we see records, so if there are no records for a partition (which can be 1-1 with beam splits), then we will never advance the watermark for the DoFn. This results in no data being advanced, triggering the window.

@johnjcasey
Copy link
Contributor

tyvm to @damccorm for the idea

@Abacn
Copy link
Contributor

Abacn commented Jan 24, 2023

If I understood correctly, #24205 fixes the SDF implementation of kafka read, i.e.

Python xlang with unbounded underlyer: does not work

still persist, i.e., java unbounded works but python does not, which suggest there is another cause for python

edit: missed #22809 (comment)

@hadikoub
Copy link

any updates on this issue?
Is there a suggested fix?

@Abacn
Copy link
Contributor

Abacn commented Jan 24, 2023

The issue from Java side (SDF implementation) should be fixed in #24205 and will be shipped with upcoming v2.45.0; @johnjcasey could not reproduce the issue of python using legacy implementation, but there is also another report #25114 (we temporarily changed python SDK to use legacy implementation from v2.42.0) I am going to take a look.

@Abacn
Copy link
Contributor

Abacn commented Jan 26, 2023

Giving up testing locally... Python direct runner does not quite support streaming and stuck indefinetely; flink runner does not see incoming data, waiting for ~1 min then the following error happens and job fails:

INFO:apache_beam.utils.subprocess_server:Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 192.168.10.176:56431-3bf602 timed out.
INFO:apache_beam.utils.subprocess_server:       ... 30 more
INFO:apache_beam.utils.subprocess_server:
ERROR:root:java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 192.168.10.176:56431-3bf602 timed out.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED

setting max_records or max_read_time do see records popping up when num record reached or timeout.

Will test on Dataflow runner tomorrow

We definitely need to improve test infrastructure. The missing piece of direct runner has generated substantial confusions...

@hadikoub
Copy link

Thank you for you reply.
If we set max_records and max_read_time, it works as suggested, but is it suitable to be used as a fix in the production environment?
Also if I understand correctly, this makes streaming data from Kafka impossible with the provided runners, or I'm missing something?

@Abacn
Copy link
Contributor

Abacn commented Jan 27, 2023

Confirms that the master branch works as expected on Dataflow runner:
image

job: https://ci-beam.apache.org/job/beam_PerformanceTests_xlang_KafkaIO_Python/47/console

pipeline setup hacked from python xlang kafka performance test: https://github.com/apache/beam/blob/8978e6375a52d9e676539bfaef2a4e35775443bb/sdks/python/apache_beam/io/external/xlang_kafkaio_perf_test.py

_ = (
        self.pipeline
        | 'ReadFromKafka' >> kafka.ReadFromKafka(
            consumer_config={
                'bootstrap.servers': self.test_options.bootstrap_servers,
                'auto.offset.reset': 'earliest'
            },
            topics=[self.kafka_topic])
        | beam.Map(lambda x: ('0', x))
        | "Fixed window 5s" >> beam.WindowInto(window.FixedWindows(5))
        | "Group by key" >> beam.GroupByKey()
        | 'Print Record Fn' >> beam.Map(printRec)

Update: tested SDF read version on Dataflow, elements emitted as expected, as well.

@chamikaramj
Copy link
Contributor

Note that portable runners may run into #20979 when reading from Kafka.

@Abacn
Copy link
Contributor

Abacn commented Jan 27, 2023

Thanks @chamikaramj for stepping in. Is the Unbounded read also affected by #20979. I tested on local flink runner neither Unbounded nor SDF read not emitting records

@chamikaramj
Copy link
Contributor

It only affects SDF but UnboundedSources get converted to SDFs when used on portable runners. Non-portable Spark/Flink should not be affected by that bug.

@Abacn
Copy link
Contributor

Abacn commented Feb 3, 2023

In this case, I am going to close this issue as DataflowRunner use case is resolved by #24205, and use #25114 for tracking portable runners (Python direct, flink)

@Abacn Abacn closed this as completed Feb 3, 2023
@github-actions github-actions bot added this to the 2.46.0 Release milestone Feb 3, 2023
@Abacn Abacn modified the milestones: 2.46.0 Release, 2.45.0 Release Feb 3, 2023
@pof-declaneaston
Copy link

Hello everyone. I am trying to build a Python DataFlow pipeline with Kafka as the input. I am experience issues with consuming from Kafka both with the DirectRunner and DataFlowRunner. If I add max_records I can can data from the DirectRunner but I haven't been able to consume messages with the DataFlowRunner. I think the DataFlow issue might actually be related to networking between GCP and my on-prem, I am working on that, but it looks like others have struggled to get DataFlow working correctly.

I can see a couple of different tickets related to this issue and I wanted to ask for some clarity on the situation as there is a lot of information:

  1. Is there a workaround for the issue on DataFlow with v2.44.0 or earlier?
  2. It looks like the issue in the DataFlow runner is being addressed in v2.45.0. Is there any estimate on when that version will be available to the public? An RC release would work well enough.
  3. Will the issue in the DirectRunner be addressed in an upcoming release?

Thanks a lot for any help!

@chamikaramj
Copy link
Contributor

If I add max_records I can can data from the DirectRunner but I haven't been able to consume messages with the DataFlowRunner.

This likely indicates that the issue you are running into for Dataflow is unrelated to other issues mentioned here. Possibly it's due to Dataflow workers not being able to connect to your on-prem cluster but hard to say without looking at the job. If you file a Dataflow support ticket, they should be able to look at your specific job.

The issue #20979 mentioned above should not affect Dataflow.

@pof-declaneaston
Copy link

Sorry I am not setting max_records on my DataFlow jobs. I can try that to separate networking issues though.

@pof-declaneaston
Copy link

I have been able to consume with v2.43.0 from DataFlow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants