From c291f4f8d3e5c17330a2740749801917267f83a4 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Wed, 12 Jun 2024 15:03:26 -0700 Subject: [PATCH 1/2] Limit the size of bundles of elements emitted by SDK into the data output stream. --- sdks/python/apache_beam/runners/worker/data_plane.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 3dd6bdbe9ae2..59f95d53ae33 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -639,8 +639,12 @@ def _write_outputs(self): streams = [self._to_send.get()] try: # Coalesce up to 100 other items. - for _ in range(100): - streams.append(self._to_send.get_nowait()) + total_size_bytes = streams[0].ByteSize() + while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and + len(streams) <= 100): + data_or_timer = self._to_send.get_nowait() + total_size_bytes += data_or_timer.ByteSize() + streams.append(data_or_timer) except queue.Empty: pass if streams[-1] is self._WRITES_FINISHED: From d7da9d318aae85a777c5bdef63ea22e3135b372c Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Thu, 13 Jun 2024 10:35:55 -0700 Subject: [PATCH 2/2] Use a type-compliant sentinel. --- sdks/python/apache_beam/runners/worker/data_plane.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 59f95d53ae33..2f9de24594b2 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -452,7 +452,7 @@ def close(self): class _GrpcDataChannel(DataChannel): """Base class for implementing a BeamFnData-based DataChannel.""" - _WRITES_FINISHED = object() + _WRITES_FINISHED = beam_fn_api_pb2.Elements.Data() def __init__(self, data_buffer_time_limit_ms=0): # type: (int) -> None @@ -475,7 +475,7 @@ def __init__(self, data_buffer_time_limit_ms=0): def close(self): # type: () -> None - self._to_send.put(self._WRITES_FINISHED) # type: ignore[arg-type] + self._to_send.put(self._WRITES_FINISHED) self._closed = True def wait(self, timeout=None):