From 3aa78d21f70eb2e4236ac86281cf037896e6faed Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 15 Mar 2024 08:40:44 -0700 Subject: [PATCH] Better error message for large elements. (#30639) This will cause an exception when the too-large element is emitted, rather than later when the proto is serialized (which happens on another thread and may also cause spurious errors in the data channel consumption). --------- Co-authored-by: tvalentyn --- sdks/python/apache_beam/runners/worker/data_plane.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 5c763743f411..b7ea07734a08 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -63,6 +63,7 @@ _DEFAULT_SIZE_FLUSH_THRESHOLD = 10 << 20 # 10MB _DEFAULT_TIME_FLUSH_THRESHOLD_MS = 0 # disable time-based flush by default +_FLUSH_MAX_SIZE = (2 << 30) - 100 # 2GB less some overhead, protobuf/grpc limit # Keep a set of completed instructions to discard late received data. The set # can have up to _MAX_CLEANED_INSTRUCTIONS items. See _GrpcDataChannel. @@ -147,6 +148,14 @@ def maybe_flush(self): def flush(self): # type: () -> None if self._flush_callback: + if self.size() > _FLUSH_MAX_SIZE: + raise ValueError( + f'Buffer size {self.size()} exceeds GRPC limit {_FLUSH_MAX_SIZE}. ' + 'This is likely due to a single element that is too large. ' + 'To resolve, prefer multiple small elements over single large ' + 'elements in PCollections. If needed, store large blobs in ' + 'external storage systems, and use PCollections to pass their ' + 'metadata, or use a custom coder that reduces the element\'s size.') self._flush_callback(self.get()) self._clear()