Skip to content

Commit

Permalink
optimize by using extra flag
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Oct 9, 2024
1 parent bfea5ae commit 0708409
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions google/api_core/grpc_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ def error_remapped_callable(*args, **kwargs):
class _StreamingResponseIterator(Generic[P], grpc.Call):
def __init__(self, wrapped, prefetch_first_result=True):
self._wrapped = wrapped
self._has_stored_first_result = False

# This iterator is used in a retry context, and returned outside after init.
# gRPC will not throw an exception until the stream is consumed, so we need
# to retrieve the first result, in order to fail, in order to trigger a retry.
try:
if prefetch_first_result:
self._stored_first_result = next(self._wrapped)
self._has_stored_first_result = True
except TypeError:
# It is possible the wrapped method isn't an iterable (a grpc.Call
# for instance). If this happens don't store the first result.
Expand All @@ -100,6 +102,7 @@ def __init__(self, wrapped, prefetch_first_result=True):
except grpc.RpcError as exc:
# If the pre-fetch fails, store exception to be raised on next() call.
self._stored_first_error = exc
self._has_stored_first_result = True

def __iter__(self) -> Iterator[P]:
"""This iterator is also an iterable that returns itself."""
Expand All @@ -112,14 +115,17 @@ def __next__(self) -> P:
protobuf.Message: A single response from the stream.
"""
try:
if hasattr(self, "_stored_first_result"):
result = self._stored_first_result
del self._stored_first_result
return result
elif hasattr(self, "_stored_first_error"):
exc = self._stored_first_error
del self._stored_first_error
raise exc
if self._has_stored_first_result:
# if we have cached results, return or raise them on first call.
self._has_stored_first_result = False
if hasattr(self, "_stored_first_result"):
result = self._stored_first_result
del self._stored_first_result
return result
if hasattr(self, "_stored_first_error"):
err = self._stored_first_error
del self._stored_first_error
raise err
return next(self._wrapped)
except grpc.RpcError as exc:
# If the stream has already returned data, we cannot recover here.
Expand Down

0 comments on commit 0708409

Please sign in to comment.