From 0708409bdeee10ff60485dab831bf0a258652e20 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Wed, 9 Oct 2024 15:32:12 -0600 Subject: [PATCH] optimize by using extra flag --- google/api_core/grpc_helpers.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index 942b05f8..024bbb00 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -83,6 +83,7 @@ def error_remapped_callable(*args, **kwargs): class _StreamingResponseIterator(Generic[P], grpc.Call): def __init__(self, wrapped, prefetch_first_result=True): self._wrapped = wrapped + self._has_stored_first_result = False # This iterator is used in a retry context, and returned outside after init. # gRPC will not throw an exception until the stream is consumed, so we need @@ -90,6 +91,7 @@ def __init__(self, wrapped, prefetch_first_result=True): try: if prefetch_first_result: self._stored_first_result = next(self._wrapped) + self._has_stored_first_result = True except TypeError: # It is possible the wrapped method isn't an iterable (a grpc.Call # for instance). If this happens don't store the first result. @@ -100,6 +102,7 @@ def __init__(self, wrapped, prefetch_first_result=True): except grpc.RpcError as exc: # If the pre-fetch fails, store exception to be raised on next() call. self._stored_first_error = exc + self._has_stored_first_result = True def __iter__(self) -> Iterator[P]: """This iterator is also an iterable that returns itself.""" @@ -112,14 +115,17 @@ def __next__(self) -> P: protobuf.Message: A single response from the stream. """ try: - if hasattr(self, "_stored_first_result"): - result = self._stored_first_result - del self._stored_first_result - return result - elif hasattr(self, "_stored_first_error"): - exc = self._stored_first_error - del self._stored_first_error - raise exc + if self._has_stored_first_result: + # if we have cached results, return or raise them on first call. + self._has_stored_first_result = False + if hasattr(self, "_stored_first_result"): + result = self._stored_first_result + del self._stored_first_result + return result + if hasattr(self, "_stored_first_error"): + err = self._stored_first_error + del self._stored_first_error + raise err return next(self._wrapped) except grpc.RpcError as exc: # If the stream has already returned data, we cannot recover here.