Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: ReadFromKafka not forwarding in streaming mode version on portable runners #25114

Open
1 of 15 tasks
jihad-akl opened this issue Jan 22, 2023 · 26 comments
Open
1 of 15 tasks

Comments

@jihad-akl
Copy link

jihad-akl commented Jan 22, 2023

What happened?

ReadFromKafka not forwarding in streaming mode.
using apache-beam 2.44.0

beam_options = PipelineOptions(streaming = True)
pipeline = beam.Pipeline(options=beam_options)

messages = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config=json.load(open("config/consumer_config_beam.json")),
topics=topic
)
| 'Print messages' >> beam.Map(lambda message: print("received!"))
)

Hello, in the code above, the code is stuck on ReadFromKafka.
Adding max_num_records will only wait for the specific amount of data and them forward them to the next step and ends the codes.
(I am using the DirectRunner I need to run the code locally)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@jihad-akl jihad-akl changed the title [Bug]: [Bug]: ReadFromKafka not forwarding in streaming mode version 2.44.0 Jan 22, 2023
@Abacn
Copy link
Contributor

Abacn commented Jan 22, 2023

streaming by definition will not end; despite python directly runner is not for production and do not have full support for streaming. This is most likely working as intended

@Abacn Abacn added P2 and removed P1 labels Jan 22, 2023
@jihad-akl
Copy link
Author

jihad-akl commented Jan 23, 2023

True, so how can use the apache beam pipeline in streaming mode if it only gather data and not send them to the next step?
The print received does not trigger every time I receive a message from kafka locally. Isn't it an important bug? and the direct runner need some fixes?

@tvalentyn
Copy link
Contributor

#24528 tracks various issues related to streaming direct runner. I am not sure if it is able to run a simple KafkaIO pipeline. Are you able to use a portable Flink Runner by chance

@jihad-akl
Copy link
Author

I am trying to to implement it but till now I am facing the same issue, I can see my pipeline in the apache flink localhost:8081 but nothing happens, I am debugging it to see if I made any mistake

@jihad-akl
Copy link
Author

So after researching and testing, I found that the Flink Runner does not help because apache flink gather a lot of data before releasing them, my use case is every message I receive from kafka I need to forwarded to the next step in the pipeline (locally).

@Abacn
Copy link
Contributor

Abacn commented Jan 24, 2023

fyi if flink runner has the same issue, it may hit #22809, the issue in python side may still persist
CC: @johnjcasey
I will also take a look

@jihad-akl
Copy link
Author

To reproduce:
consumer_config.json:
{
"bootstrap.servers": "127.0.0.1:9092"
}
main.py:
topic = ["multi-video-stream"]

beam_options = PipelineOptions(["--runner=FlinkRunner","--flink_version=1.15","--flink_master=localhost:8081"
                                ,"--environment_type=LOOPBACK","--streaming"])
with beam.Pipeline(options=beam_options) as p:
    messages = p | 'Read from kafka' >> ReadFromKafka(consumer_config=json.load(open("consumer_config.json"))
                                                   ,topics=topic)
    messages | 'Print Messages' >> beam.Map(print)

@jihad-akl
Copy link
Author

jihad-akl commented Jan 25, 2023

I am using this producer_config:
{
"bootstrap.servers": "localhost:9092",
"enable.idempotence": true,
"retries": 100,
"max.in.flight.requests.per.connection": 5,
"compression.type": "snappy",
"linger.ms": 5,
"batch.num.messages": 1,
"queue.buffering.max.ms": 0,
"queue.buffering.max.messages": 10
}
and this kafka: yml
https://github.com/conduktor/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
for the producer code:

producer = Producer(json.load(open("producer_config.json")))
frame_no = 0
while True:

    frame_bytes = "hello" + str(frame_no)
    producer.produce(
        topic="multi-video-stream", 
        value=frame_bytes, 
        on_delivery=delivery_report,
        timestamp=frame_no,
        headers={
            "test": str.encode("test")
        }
    )
    frame_no+=1
    # producer.poll(1)
    producer.flush()
    
    time.sleep(0.1)

@jihad-akl
Copy link
Author

jihad-akl commented Jan 25, 2023

Please Note that if I use flink runner 1.14 I get:

ERROR:apache_beam.utils.subprocess_server:Starting job service with ['java', '-jar', '/root/.apache_beam/cache/jars/beam-runners-flink-1.14-job-server-2.44.0.jar', '--flink-master', 'http://localhost:8081', '--artifacts-dir', '/tmp/beam-temp1of29sbe/artifactsz8je29uo', '--job-port', '33755', '--artifact-port', '0', '--expansion-port', '0']
ERROR:apache_beam.utils.subprocess_server:Error bringing up service
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/apache_beam/utils/subprocess_server.py", line 88, in start
raise RuntimeError(
RuntimeError: Service failed to start up with error 1
Traceback (most recent call last):
File "main.py", line 38, in
with beam.Pipeline(options=beam_options) as p:
File "/usr/local/lib/python3.10/dist-packages/apache_beam/pipeline.py", line 600, in exit
self.result = self.run()
File "/usr/local/lib/python3.10/dist-packages/apache_beam/pipeline.py", line 577, in run
return self.runner.run_pipeline(self, self._options)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/flink_runner.py", line 45, in run_pipeline
return super().run_pipeline(pipeline, options)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/portable_runner.py", line 439, in run_pipeline
job_service_handle = self.create_job_service(options)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/portable_runner.py", line 318, in create_job_service
return self.create_job_service_handle(server.start(), options)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/job_server.py", line 81, in start
self._endpoint = self._job_server.start()
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/portability/job_server.py", line 110, in start
return self._server.start()
File "/usr/local/lib/python3.10/dist-packages/apache_beam/utils/subprocess_server.py", line 88, in start
raise RuntimeError(
RuntimeError: Service failed to start up with error

@Abacn
Copy link
Contributor

Abacn commented Jan 27, 2023

per #22809 the cause is likely #20979. It is due to a feature lacking on python portable runner. Dataflow runner is not affected. The thing I am not sure is why the unbounded reader also not working.

@vjixy
Copy link

vjixy commented Jan 29, 2023

So there is bugs in the portable runners? :(

@Abacn Abacn changed the title [Bug]: ReadFromKafka not forwarding in streaming mode version 2.44.0 [Bug]: ReadFromKafka not forwarding in streaming mode version on portable runners Feb 3, 2023
@Abacn
Copy link
Contributor

Abacn commented Feb 3, 2023

yes, or feature missing

@alexmreis
Copy link

The implementation of Kafka in the Python SDK + Portable Runner is unfortunately rather broken for streaming use cases. I don't understand why there isn't a native python implementation based on https://github.com/confluentinc/confluent-kafka-python that doesn't have to deal with the portability layer. It would be much more reliable, even if maybe less capable of parallel compute.

Our company has abandoned Beam and Dataflow for this very reason. Last bug I opened in August 2022, #22809 was closed today but still depends on 2 other issues, one of which remains unsolved #25114 half a year later. The Python SDK is clearly not a priority for the core team. Maybe they're too busy focusing on GCP-specific products like PubSub to put in the effort to make open source tools, like Kafka, work properly in Beam's Python SDK. There isn't even a single unit test in the test suite for an unbounded Kafka stream being windowed and keyed.

As someone who really believes in Beam as a great portable standard for data engineering, it's sad to see the lack of interest from the core team in anything that is not making Google money (although we would still be paying for Dataflow if it worked).

@Abacn
Copy link
Contributor

Abacn commented Feb 4, 2023

Hi @alexmreis sorry if there is any misunderstanding, #22809 is closed because the issue on KafkaIO side is fixed, by #24205 (it comments closes #22809: #24205 (comment)) That said, the use case of Dataflow Runner should be fixed in upcoming Beam v2.45.0

It still experiencing issues on portable runner (flink, direct streaming) is an issue not limited to kafka source. It affects all "splittable DoFn" streaming source. This functionality is not yet supported by portable runner (#20979). I also got bite by this issue quite often (when I validating the fix of #24205, see comments of #22809 I had). The gap between Dataflow and local runners is definitely an important thing need improve. This has direct impact to developers.

Besides, no unit test in Python Kafka IO is intended. Within the cross-language framework, the code running kafka read is Java's KafkaIO and unit test is exercised there. We have CrossLanguage Validation Runner (XVR) Tests for each xlang IO and each SDK exercised in schedule. And I recently added a Python KafkaIO performance test also. That said KafkaIO in both Java and Python are our team's priority.

@hadikoub
Copy link

hadikoub commented Mar 2, 2023

Was this issue addressed in the new version 2.45?

@Abacn
Copy link
Contributor

Abacn commented Mar 2, 2023

Was this issue addressed in the new version 2.45?

Not yet. This is the feature gap in portable runner. May need substantial effort. I am trying to work on it currently though

@jihad-akl
Copy link
Author

Any update for this issue in version 2.47?

@jihad-akl
Copy link
Author

Any update for this issue in version 2.48?

@jihad-akl
Copy link
Author

Was this issue addressed in the new version 2.45?

Not yet. This is the feature gap in portable runner. May need substantial effort. I am trying to work on it currently though

Any update?

@Abacn
Copy link
Contributor

Abacn commented Jun 2, 2023

Not able to get into this.

@jihad-akl
Copy link
Author

Any update for this issue in version 2.49?

@jihad-akl
Copy link
Author

Not able to get into this.

Any idea where the problem is to try and make a work around?

@jihad-akl
Copy link
Author

Any news for version 2.50?

@jihad-akl
Copy link
Author

Almost 1 year and didn't get any clear response if that bug will be fixed or no, 7 versions from version 2.44 to 2.51 and the bug remains.
Will this bug be fixed?

@MrYanMYN
Copy link

For anybody stumbling upon this issue, a year later this bug is still present

@liferoad
Copy link
Collaborator

There is no plan to fix this for Python DirectRunner. We are moving to Prism Runner (#29650). The goal is make this as the default one for all SDKs to allow users to do local tests and developments with this local runner. This work is currently on-going.

@kennknowles FYI. For Beam on Flink.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants