You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi Jaehyeon,
I am trying too build a stream processing pipeline consisting of multiple processes which are dependent. I want to sink data from one data stream and use it as a source for another datastream. I am using KafkaSource and kafkaSink for this purpose. I have modified your sink file to test some of my use case and it seems to error out due to some reason which I am not able to figure out.
this is a little modification which I am trying to add where I create a new datastream through a operator and the add a sink to it. It fails with the error. (Note: The same sink when applied to skyone_stream is working perfectly fine)
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:39377 {created_time:"2024-08-16T18:31:47.719402303+05:30", grpc_status:1, grpc_message:"Multiplexer hanging up"}"
Traceback (most recent call last):
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 505, in input_elements
element = received.get(timeout=1)
File "/usr/lib/python3.10/queue.py", line 179, in get
raise Empty
_queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
response = task()
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in
lambda: self.create_worker().do_instruction(request), request)
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
return getattr(self, request_type)(
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 667, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1050, in process_bundle
for element in data_channel.input_elements(instruction_id,
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 508, in input_elements
raise RuntimeError('Channel closed prematurely.')
RuntimeError: Channel closed prematurely.
This is a major blocker as I am not able to pass on the data to other processes. Any help would br highly apprciated.
Thankyou
The text was updated successfully, but these errors were encountered:
I am following the same file but I require some case specific functionalities. As far as I have decoded the bug, the issue lies with the serialization schema which I am using for the KafkaSink. I think once we process any datastream through any function, the type of data in the stream changes through deserialization. Can you help me understand how to deal with Serialization and Deserialization schema. It is difficult for me as the error occurs in some java functionality which I am unable to decode through python. Any guidance would help a lot.
Thankyou
Hi Jaehyeon,
I am trying too build a stream processing pipeline consisting of multiple processes which are dependent. I want to sink data from one data stream and use it as a source for another datastream. I am using KafkaSource and kafkaSink for this purpose. I have modified your sink file to test some of my use case and it seems to error out due to some reason which I am not able to figure out.
this is a little modification which I am trying to add where I create a new datastream through a operator and the add a sink to it. It fails with the error. (Note: The same sink when applied to skyone_stream is working perfectly fine)
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:39377 {created_time:"2024-08-16T18:31:47.719402303+05:30", grpc_status:1, grpc_message:"Multiplexer hanging up"}"
Traceback (most recent call last):
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 505, in input_elements
element = received.get(timeout=1)
File "/usr/lib/python3.10/queue.py", line 179, in get
raise Empty
_queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 295, in _execute
response = task()
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 370, in
lambda: self.create_worker().do_instruction(request), request)
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 629, in do_instruction
return getattr(self, request_type)(
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 667, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1050, in process_bundle
for element in data_channel.input_elements(instruction_id,
File "/home/chinmayp/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 508, in input_elements
raise RuntimeError('Channel closed prematurely.')
RuntimeError: Channel closed prematurely.
This is a major blocker as I am not able to pass on the data to other processes. Any help would br highly apprciated.
Thankyou
The text was updated successfully, but these errors were encountered: