diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 5c763743f411..b7ea07734a08 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -63,6 +63,7 @@ _DEFAULT_SIZE_FLUSH_THRESHOLD = 10 << 20 # 10MB _DEFAULT_TIME_FLUSH_THRESHOLD_MS = 0 # disable time-based flush by default +_FLUSH_MAX_SIZE = (2 << 30) - 100 # 2GB less some overhead, protobuf/grpc limit # Keep a set of completed instructions to discard late received data. The set # can have up to _MAX_CLEANED_INSTRUCTIONS items. See _GrpcDataChannel. @@ -147,6 +148,14 @@ def maybe_flush(self): def flush(self): # type: () -> None if self._flush_callback: + if self.size() > _FLUSH_MAX_SIZE: + raise ValueError( + f'Buffer size {self.size()} exceeds GRPC limit {_FLUSH_MAX_SIZE}. ' + 'This is likely due to a single element that is too large. ' + 'To resolve, prefer multiple small elements over single large ' + 'elements in PCollections. If needed, store large blobs in ' + 'external storage systems, and use PCollections to pass their ' + 'metadata, or use a custom coder that reduces the element\'s size.') self._flush_callback(self.get()) self._clear()