Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retry and retry_async support streaming rpcs #495

Merged
merged 220 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 83 commits
Commits
Show all changes
220 commits
Select commit Hold shift + click to select a range
953106a
got retryable generators partially working
daniel-sanche Feb 8, 2023
89aeb75
added retrun statement
daniel-sanche Feb 8, 2023
27feb80
refactoring
daniel-sanche Feb 8, 2023
0dffa6d
work for now deadline
daniel-sanche Feb 8, 2023
b330c3b
improved synchronous generator retry
daniel-sanche Feb 8, 2023
67ceaa2
handle closing and returns
daniel-sanche Feb 10, 2023
ee2647a
got test to pass
daniel-sanche Feb 10, 2023
5a5396c
restructured test
daniel-sanche Feb 10, 2023
7afa76b
added tests
daniel-sanche Feb 10, 2023
2d91ade
refactoring and comments in retry code
daniel-sanche Feb 10, 2023
0cd384e
fixed helper; added is_generator flag
daniel-sanche Feb 11, 2023
f72bbec
got first test working
daniel-sanche Feb 11, 2023
88eed5c
remove extra await in front of async generator
daniel-sanche Feb 11, 2023
91f9cc4
implemented async generator retry test
daniel-sanche Feb 11, 2023
c3eb997
fixed is_generator
daniel-sanche Feb 11, 2023
f6c6201
added tests for aclose and athrow
daniel-sanche Feb 11, 2023
57b0ee3
simplified close; don't support throws
daniel-sanche Feb 11, 2023
e814ce7
added tests
daniel-sanche Feb 11, 2023
0ffb03f
have test that throw should retry
daniel-sanche Feb 11, 2023
a8024f3
improved aclose and athrow
daniel-sanche Feb 11, 2023
c76f641
added comments
daniel-sanche Feb 11, 2023
ee631e3
close synchronous generator
daniel-sanche Feb 11, 2023
70eb78c
refactor async file
daniel-sanche Feb 11, 2023
42ee132
ran blacken
daniel-sanche Feb 11, 2023
102d83b
improved send test
daniel-sanche Feb 11, 2023
f029dbd
improved comments
daniel-sanche Feb 11, 2023
c83c62a
got send working
daniel-sanche Feb 11, 2023
185826c
tested deadline handling
daniel-sanche Feb 11, 2023
c5f7bbe
changed timeout to only count time awaiting or sleeping
daniel-sanche Feb 13, 2023
4242036
improved comments
daniel-sanche Feb 14, 2023
9c4799c
added test for cancellation
daniel-sanche Feb 14, 2023
0bd6cab
improved comments
daniel-sanche Feb 14, 2023
67aeeaf
on_error can yield into the generator stream
daniel-sanche Feb 14, 2023
985b13a
Merge branch 'main' into retry_generators
daniel-sanche Apr 3, 2023
0ea8297
added filter_func to retryable generator
daniel-sanche Apr 4, 2023
b952652
fixed error in time budget calculation
daniel-sanche Apr 6, 2023
6cb3e2d
added from field to raised timeout exception
daniel-sanche Apr 6, 2023
99da116
removed filter_fn
daniel-sanche Apr 6, 2023
7f862d0
ran blacken
daniel-sanche Apr 6, 2023
04a4a69
removed generator auto-detection
daniel-sanche Apr 7, 2023
d20cf08
fixed tests and lint
daniel-sanche Apr 7, 2023
183c221
changed comments
daniel-sanche Apr 7, 2023
d2217e4
fixed 3.11 failed test
daniel-sanche Apr 7, 2023
d4a9d30
added comments
daniel-sanche Apr 7, 2023
06d45cc
made streaming retries into a custom generator object
daniel-sanche Apr 13, 2023
de41a14
added tests for iterators
daniel-sanche Apr 13, 2023
dcb3766
added test for non-awaitable target
daniel-sanche Apr 13, 2023
dd368e4
changed is_generator to is_stream
daniel-sanche Apr 13, 2023
452b9bb
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 13, 2023
6879418
changed docstrings
daniel-sanche Apr 13, 2023
847509f
removed back-ticks
daniel-sanche Apr 13, 2023
b5e3796
removed outdated comment
daniel-sanche Apr 13, 2023
7a7d9ac
changed comments
daniel-sanche Apr 14, 2023
6619895
moved streaming retries to new files
daniel-sanche Apr 14, 2023
27fc930
reverted some style changes
daniel-sanche Apr 14, 2023
d6a23ea
changed comments
daniel-sanche Apr 14, 2023
90ef834
added comments
daniel-sanche Apr 14, 2023
6201db6
refactoring and commenting
daniel-sanche Apr 14, 2023
61ce3a7
blacken/mypy fixes
daniel-sanche Apr 14, 2023
69149a1
fixed issue with py37
daniel-sanche Apr 14, 2023
d63871e
added tests for bad sleep generators
daniel-sanche Apr 14, 2023
773e033
improved test_retry coverage
daniel-sanche Apr 14, 2023
d1def5d
improved async test coverage
daniel-sanche Apr 14, 2023
cbaaa1d
added test for calling next on exhausted generator
daniel-sanche Apr 14, 2023
21a863f
fixed lint issue
daniel-sanche Apr 14, 2023
878ddfb
changed docstring
daniel-sanche Apr 14, 2023
7b0a600
changed docstrings
daniel-sanche Apr 14, 2023
0188228
updated comments
daniel-sanche Apr 14, 2023
902a4ab
updated comments
daniel-sanche Apr 14, 2023
74f3f3e
fixed send and asend retry logic
daniel-sanche Apr 14, 2023
e506aad
update test error string
daniel-sanche Apr 19, 2023
5baa2aa
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 19, 2023
5c3805d
improved type hinting
daniel-sanche Apr 19, 2023
265d998
improved test docs
daniel-sanche Apr 19, 2023
0423ebe
fixed mypy issues
daniel-sanche Apr 20, 2023
c4049f5
Merge branch 'main' into retry_generators
daniel-sanche Apr 21, 2023
acd6546
remove wait_for in async streaming for perf reasons
daniel-sanche May 8, 2023
b1ad4b3
fixed style issues
daniel-sanche May 8, 2023
8dcf67c
fixed callable type annotation
daniel-sanche May 10, 2023
6104c59
change time calculations
daniel-sanche May 12, 2023
43d0913
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 12, 2023
9ba7676
simplified retry_streaming_async to use wall time instead of cpu time
daniel-sanche May 19, 2023
14c195c
Merge branch 'main' into retry_generators
daniel-sanche Jun 16, 2023
de7b51a
removed extra CancelledError handling
daniel-sanche Jun 17, 2023
4cdee6b
improved docstrings
daniel-sanche Jun 20, 2023
a526d65
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jun 20, 2023
ee2bbdd
Merge branch 'main' into retry_generators
daniel-sanche Jul 17, 2023
5f82355
swapped out utcnow for more performant time.monotonic
daniel-sanche Jul 28, 2023
9900c40
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 28, 2023
2c2dcbe
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 28, 2023
3340399
Merge branch 'retry_generators' of https://github.com/googleapis/pyth…
gcf-owl-bot[bot] Jul 28, 2023
de07714
Merge branch 'main' into retry_generators
parthea Aug 7, 2023
67068ac
don't check timeout on each yield by default
daniel-sanche Aug 14, 2023
54325bc
added exception building logic
daniel-sanche Aug 15, 2023
bafa18b
added type hint to check_timeout_on_yield
daniel-sanche Aug 15, 2023
2ae2a32
simplified ensure_tareget; fixed mypy issues
daniel-sanche Aug 15, 2023
9cadd63
don't check timeout on each yield by default
daniel-sanche Aug 14, 2023
c9ef1d5
added exception building logic
daniel-sanche Aug 15, 2023
41c7868
added type hint to check_timeout_on_yield
daniel-sanche Aug 15, 2023
30fccb9
simplified ensure_tareget; fixed mypy issues
daniel-sanche Aug 15, 2023
a2b0e6c
remove iteration helper
daniel-sanche Aug 15, 2023
4aa1ab4
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 15, 2023
8349424
added test coverage for send/asend
daniel-sanche Aug 15, 2023
ece5cf8
Merge branch 'retry_generators_2' into retry_generators
daniel-sanche Aug 15, 2023
5ddda24
added test for closing new generator
daniel-sanche Aug 15, 2023
9e3ea92
improved test decorators
daniel-sanche Aug 15, 2023
3b06b3a
swapped out generator object with generator function
daniel-sanche Aug 15, 2023
8bb6b0c
support iterators, along with generators
daniel-sanche Aug 15, 2023
37c64a0
got tests passing with new structure
daniel-sanche Aug 15, 2023
cee0028
replaces sync streaming retries object with generator function
daniel-sanche Aug 15, 2023
3a7e5fa
removed timeout on yield functionality
daniel-sanche Aug 15, 2023
ba6dc9f
fixed comments
daniel-sanche Aug 15, 2023
0500b8b
fixed mypy issues
daniel-sanche Aug 15, 2023
1ccadb1
fixed issue with py310
daniel-sanche Aug 15, 2023
c312262
renamed streaming retry function
daniel-sanche Aug 15, 2023
1fe57e0
removed unneeded functions
daniel-sanche Aug 15, 2023
4f09f29
simplified some test functions
daniel-sanche Aug 15, 2023
06824b9
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Aug 15, 2023
343157b
removed unneeded test variable
daniel-sanche Aug 15, 2023
93f82cc
improved documentation
daniel-sanche Aug 16, 2023
0915ca0
Merge branch 'main' into retry_generators
parthea Sep 1, 2023
61e5ab5
fixed type hinting issues
daniel-sanche Sep 1, 2023
51c125b
fixed undefined name issues
daniel-sanche Sep 1, 2023
02604bc
fixed lint issues
daniel-sanche Sep 1, 2023
6269db2
update comment
daniel-sanche Sep 1, 2023
0dcd0de
fix typo
daniel-sanche Sep 1, 2023
54e9c81
Update google/api_core/retry_streaming.py
daniel-sanche Sep 1, 2023
2342910
added comment to on_error
daniel-sanche Sep 1, 2023
eada0d7
fixed indentation
daniel-sanche Sep 1, 2023
ae2bf37
improved sample
daniel-sanche Sep 1, 2023
c8a4f26
improved default exception factory
daniel-sanche Sep 1, 2023
2840b9f
added pylint disable line
daniel-sanche Sep 1, 2023
82274a3
cleaned up async retry wrapping
daniel-sanche Sep 1, 2023
1594a17
improved sample
daniel-sanche Sep 1, 2023
9b0ddb0
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 1, 2023
8985127
remove extra generator close line
daniel-sanche Sep 1, 2023
60b20ab
added missing test
daniel-sanche Sep 1, 2023
237ca3d
AsyncRetry adds a coroutine in front of async streams
daniel-sanche Sep 12, 2023
a46c0f7
improved type checking
daniel-sanche Sep 12, 2023
93727b7
Merge branch 'main' into retry_generators
daniel-sanche Sep 12, 2023
796ae52
fixed typing issues
daniel-sanche Sep 12, 2023
0688ffe
moved docstrings
daniel-sanche Sep 21, 2023
da048ab
use enum in exception builder
daniel-sanche Sep 21, 2023
80e5eb0
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 21, 2023
562079b
fixed lint and docs issues
daniel-sanche Sep 21, 2023
a0fecc5
Merge branch 'main' into retry_generators
daniel-sanche Oct 3, 2023
8cc6ea9
Update tests/unit/test_retry.py
daniel-sanche Oct 6, 2023
e7a5cd4
fixed comment line break
daniel-sanche Oct 6, 2023
02c12cc
use kwargs map
daniel-sanche Oct 6, 2023
03b1608
fixed on_error docstrings
daniel-sanche Oct 6, 2023
b05b11f
renamed example lists
daniel-sanche Oct 6, 2023
0b5d3a2
removed ignore_sent
daniel-sanche Oct 6, 2023
03f2af5
fixed lint issues
daniel-sanche Oct 6, 2023
5fee888
fixed generator mock and added comments
daniel-sanche Oct 6, 2023
239ed7d
Merge branch 'main' into retry_generators
daniel-sanche Oct 6, 2023
94eb0f5
Merge branch 'main' into retry_generators
daniel-sanche Oct 17, 2023
7d1e246
Merge branch 'main' into retry_generators
parthea Nov 8, 2023
b0faa2d
Apply suggestions from code review
daniel-sanche Nov 9, 2023
6c44298
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 9, 2023
51df672
Update google/api_core/retry.py
daniel-sanche Nov 9, 2023
e207376
removed unneeded comments
daniel-sanche Nov 9, 2023
39716a7
improved comments
daniel-sanche Nov 9, 2023
2bbf33f
simplified generator detection
daniel-sanche Nov 9, 2023
3b03bfa
renamed variables
daniel-sanche Nov 9, 2023
e63701d
improved comments
daniel-sanche Nov 9, 2023
c101ea6
renamed variable
daniel-sanche Nov 9, 2023
3642d74
fixed tests
daniel-sanche Nov 9, 2023
34cfa08
improved comments
daniel-sanche Nov 9, 2023
583181d
Merge branch 'main' into retry_generators
daniel-sanche Nov 17, 2023
b311b87
fixed retry factory functionality
daniel-sanche Nov 18, 2023
19a998d
created new objects for streaming retry config
daniel-sanche Nov 20, 2023
5637e88
added typing to base retry
daniel-sanche Nov 20, 2023
c4be5f2
share base retry logic
daniel-sanche Nov 20, 2023
4d9e762
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 20, 2023
2e9e84b
lint and mypy cleanup
daniel-sanche Nov 20, 2023
d183a7e
removed unneeded changes
daniel-sanche Nov 21, 2023
e2d9c9c
pass in initial args and kwargs to retry_target_stream
daniel-sanche Nov 21, 2023
4543106
uncommented functools.wraps
daniel-sanche Nov 21, 2023
d791aad
Merge branch 'main' into retry_generators
daniel-sanche Nov 21, 2023
638cc68
change enum encoding
daniel-sanche Nov 30, 2023
f7b1e14
moved base retry into own file
daniel-sanche Nov 30, 2023
07db4c2
restructured files
daniel-sanche Nov 30, 2023
d448a52
expose other retry target functions in retry __init__
daniel-sanche Nov 30, 2023
781426a
share a logger
daniel-sanche Nov 30, 2023
4a05404
extracted shared error handling logic
daniel-sanche Dec 1, 2023
b221c8d
added type hints
daniel-sanche Dec 1, 2023
b5b4534
removed costly awaitable check
daniel-sanche Dec 1, 2023
0f1145d
revised docstring
daniel-sanche Dec 1, 2023
8408512
added exception_factory docstrings
daniel-sanche Dec 1, 2023
aa69c56
Revert "removed costly awaitable check"
daniel-sanche Dec 1, 2023
d1ac29d
renamed variable
daniel-sanche Dec 5, 2023
3ab88fc
update docstring
daniel-sanche Dec 5, 2023
382d0e2
add punctuation
daniel-sanche Dec 5, 2023
4258823
punctuation
daniel-sanche Dec 5, 2023
1bc9731
update docstrings
daniel-sanche Dec 5, 2023
aafe057
changed deadline to timeout
daniel-sanche Dec 5, 2023
8095229
updated deadlien to timeout in docstrings
daniel-sanche Dec 5, 2023
de9f518
update docstring
daniel-sanche Dec 5, 2023
7864667
update test comment
daniel-sanche Dec 5, 2023
4c24322
update docstrings
daniel-sanche Dec 5, 2023
7855513
removed unneeded comments
daniel-sanche Dec 5, 2023
f4bfb02
improved docstrings
daniel-sanche Dec 5, 2023
a88cf6f
use timeout in tests
daniel-sanche Dec 5, 2023
b5c62e1
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Dec 5, 2023
852f4f8
moved test to proper place
daniel-sanche Dec 5, 2023
cd8323e
added test comments; fixed asserts
daniel-sanche Dec 5, 2023
ace61eb
use _build_retry_error as default param
daniel-sanche Dec 5, 2023
1bbd1f0
ran blacken
daniel-sanche Dec 5, 2023
35cc00a
added comment to clarify timeouts
daniel-sanche Dec 5, 2023
89abfa4
removed timeout vs deadline explainer from retry_streaming
daniel-sanche Dec 5, 2023
74ab817
remove duplicated test
daniel-sanche Dec 8, 2023
85b3e02
fixed variable name
daniel-sanche Dec 8, 2023
6dbe17d
made build_retry_error public
daniel-sanche Dec 8, 2023
71e5888
changed docstring
daniel-sanche Dec 8, 2023
cbae3d3
import extra helper in retry_unary_async
daniel-sanche Dec 11, 2023
61198b8
Merge branch 'main' into retry_generators
vchudnov-g Dec 11, 2023
acf9752
fix: address backwards compatibility warnings failing presubmits
vchudnov-g Dec 12, 2023
7cf9fbf
fix: address mypy errors
vchudnov-g Dec 12, 2023
f62439a
fix: address coverage and lint issues failing presubmits
vchudnov-g Dec 12, 2023
b7abeca
chore: simplify resolution of backaward-compatibility warnings
vchudnov-g Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions google/api_core/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def check_if_exists():

from google.api_core import datetime_helpers
from google.api_core import exceptions
from google.api_core.retry_streaming import RetryableGenerator
from google.auth import exceptions as auth_exceptions

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,7 +155,7 @@ def retry_target(
higher-level retry helper :class:`Retry`.

Args:
target(Callable): The function to call and retry. This must be a
target(Callable[[], Any]): The function to call and retry. This must be a
nullary function - apply arguments with `functools.partial`.
predicate (Callable[Exception]): A callable used to determine if an
exception raised by the target should be considered retryable.
Expand Down Expand Up @@ -301,6 +302,15 @@ class Retry(object):
maximum (float): The maximum amount of time to delay in seconds.
multiplier (float): The multiplier applied to the delay.
timeout (float): How long to keep retrying, in seconds.
on_error (Callable[Exception]): A function to call while processing
vchudnov-g marked this conversation as resolved.
Show resolved Hide resolved
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In retry_async.py:AsyncRetry you have different type specifiers:

  • timeout (Optional[float])
  • on_error (Optional[Callable[Exception]])

Should they be the same in both places? And it doesn't seem that the type hints in the __init__s quite match, either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah they probably should, I think I was just avoiding changing too much of the existing comments

If we're going to make a change, maybe we should remove the types from the comments entirely now that we have type annotations, so we don't have to duplicate it in multiple places?

a retryable exception. Any error raised by this function will
*not* be caught.
is_stream (bool): Indicates whether the input function
should be treated as an stream function (i.e. a Generator,
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
or function that returns an Iterable). If True, the iterable
will be wrapped with retry logic, and any failed outputs will
restart the stream. If False, only the input function call itself
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restart the stream from the last non-failed output, right ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately that's not really something we can do in a general way. We can create a new generator and start yielding from it again, but we have no guarantee that it will yield the same sequence as last time. (For example, in Bigtable we will modify the request after a failure to avoid requesting repeat data from the server)

I considered adding a "filter" on top of the generator, and providing a default filter that raises an exception if a retry deviates from the initial stream, which would be one way to do what you requested. But in the end, I decided that it's probably better to keep this code simple and just pass on the generator values directly, and the caller can do their own filtering over top of the retry-stream

Copy link

@dizcology dizcology Jun 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this functionality is not intended for external users directly, but rather for the library generator and handwritten library code, this seems fine. But the generator and library developers need to be aware of this and design their target and on_error callables carefully if they want to accomplish certain behaviors such as the "filter through modified request" approach you described.

Please expand the docstring of RetryableGenerator to provide some guidance and example pattern for the library developers so they do not make false assumptions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I added a much more documentation. Let me know that looks better

So far this will mostly just be useful for hand-written layers. If we want to add this to gapic libraries automatically, we could try to classify the different kinds of streams our libraries return and provide a couple different presets for different kinds of streams. But so far that's been out of scope.

will be retried. Defaults to False.
deadline (float): DEPRECATED: use `timeout` instead. For backward
compatibility, if specified it will override the ``timeout`` parameter.
"""
Expand All @@ -313,7 +323,8 @@ def __init__(
multiplier=_DEFAULT_DELAY_MULTIPLIER,
timeout=_DEFAULT_DEADLINE,
on_error=None,
**kwargs
is_stream=False,
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
**kwargs,
):
self._predicate = predicate
self._initial = initial
Expand All @@ -322,6 +333,7 @@ def __init__(
self._timeout = kwargs.get("deadline", timeout)
self._deadline = self._timeout
self._on_error = on_error
self._is_stream = is_stream

def __call__(self, func, on_error=None):
"""Wrap a callable with retry behavior.
Expand All @@ -346,7 +358,8 @@ def retry_wrapped_func(*args, **kwargs):
sleep_generator = exponential_sleep_generator(
self._initial, self._maximum, multiplier=self._multiplier
)
return retry_target(
retry_func = RetryableGenerator if self._is_stream else retry_target
return retry_func(
target,
self._predicate,
sleep_generator,
Expand Down
41 changes: 35 additions & 6 deletions google/api_core/retry_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async def check_if_exists():
from google.api_core.retry import exponential_sleep_generator
from google.api_core.retry import if_exception_type # noqa: F401
from google.api_core.retry import if_transient_error
from google.api_core.retry_streaming_async import AsyncRetryableGenerator


_LOGGER = logging.getLogger(__name__)
Expand All @@ -74,13 +75,13 @@ async def check_if_exists():
async def retry_target(
target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the streaming versions have type declarations. Do you want to have type declarations in the non-stream versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I was trying to avoid scope growth, but it probably makes sense to add at this point. Done

):
"""Call a function and retry if it fails.
"""Await a coroutine and retry if it fails.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the sleep-delay logic should be the same between all four versions of this function: syncvs async, unary vs streaming. I would suggest:

  • at the very least making the four functions look as similar as possible with their timeout handling logic. For example, the async versions uses min(), which the sync version probably should as well. The streaming functions use exception_factory; should the unary ones as well, for consistenct?

  • consider whether it's possible and desirable to factor any of the time- and exception-handling functionality into a common function. I suspect the answer is "no" because it would make the code more complex and harder to follow, but it would have the advantage that we'd be using the same logic in the parts where it makes sense. Maybe just factor out the if deadline and _LOGGER at the bottom of the loop into a helper function that all four versions call to increment the sleep consistently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to avoid making changes to the existing retries, but I suppose with the new base classes, it makes sense to generalize them a bit. I added a shared _retry_error_helper function, which either raises an error, or logs the retry message.

Note though that I had to make a (potentially breaking) change to the async unary retry logic to make it consistent though. Previously async_unary would attempt to cancel early when the deadline is reached, while the other retries only check the deadline after the request terminates. The old behavior was very slow (using asyncio.wait_for), and caused race conditions. I'd consier it a bug, but fixing the behavior may count as a breaking change (even though gapic functions shouldn't be affected)


This is the lowest-level retry helper. Generally, you'll use the
higher-level retry helper :class:`Retry`.

Args:
target(Callable): The function to call and retry. This must be a
target(Callable[[], Any]): The function to call and retry. This must be a
nullary function - apply arguments with `functools.partial`.
predicate (Callable[Exception]): A callable used to determine if an
exception raised by the target should be considered retryable.
Expand Down Expand Up @@ -156,7 +157,7 @@ async def retry_target(


class AsyncRetry:
"""Exponential retry decorator for async functions.
"""Exponential retry decorator for async coroutines.

This class is a decorator used to add exponential back-off retry behavior
to an RPC call.
Expand All @@ -172,9 +173,17 @@ class AsyncRetry:
maximum (float): The maximum amount of time to delay in seconds.
multiplier (float): The multiplier applied to the delay.
timeout (float): How long to keep retrying in seconds.
When ``is_stream``, only time spent waiting on the
target or sleeping between retries is counted towards the timeout.
on_error (Callable[Exception]): A function to call while processing
a retryable exception. Any error raised by this function will
*not* be caught.
is_stream (bool): Indicates whether the input function
should be treated as an stream function (i.e. an AsyncGenerator,
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
or function or coroutine that returns an AsyncIterable).
If True, the iterable will be wrapped with retry logic, and any
failed outputs will restart the stream. If False, only the input
function call itself will be retried. Defaults to False.
deadline (float): DEPRECATED use ``timeout`` instead. If set it will
override ``timeout`` parameter.
"""
Expand All @@ -187,6 +196,7 @@ def __init__(
multiplier=_DEFAULT_DELAY_MULTIPLIER,
timeout=_DEFAULT_TIMEOUT,
on_error=None,
is_stream=False,
**kwargs
):
self._predicate = predicate
Expand All @@ -196,12 +206,13 @@ def __init__(
self._timeout = kwargs.get("deadline", timeout)
self._deadline = self._timeout
self._on_error = on_error
self._is_stream = is_stream

def __call__(self, func, on_error=None):
"""Wrap a callable with retry behavior.

Args:
func (Callable): The callable to add retry behavior to.
func (Callable): The callable or stream to add retry behavior to.
on_error (Callable[Exception]): A function to call while processing
a retryable exception. Any error raised by this function will
*not* be caught.
Expand All @@ -224,11 +235,29 @@ async def retry_wrapped_func(*args, **kwargs):
target,
self._predicate,
sleep_generator,
self._timeout,
timeout=self._timeout,
on_error=on_error,
)

@functools.wraps(func)
def retry_wrapped_stream(*args, **kwargs):
"""A wrapper that iterates over target stream with retry."""
target = functools.partial(func, *args, **kwargs)
sleep_generator = exponential_sleep_generator(
self._initial, self._maximum, multiplier=self._multiplier
)
return AsyncRetryableGenerator(
target,
self._predicate,
sleep_generator,
timeout=self._timeout,
on_error=on_error,
)

return retry_wrapped_func
if self._is_stream:
Copy link
Contributor

@vchudnov-g vchudnov-g Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't use a function pointer here, like you did in the sync case (retry_func = retry_target_stream if self._is_stream else retry_target), so that we only need one wrapped function instead of two that are almost identical?

NB: I notice that you're making changes to this PR, so I can't find the code that I quoted in the current version, though it is in the PR that I cloned locally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason is because the existing retry_wrapped_func was an async function, and I was trying to add functionality without changing too much of the existing code.

But I just made a change to make retry_wrapped_func into a sync function that returns the retry coroutine directly instead of awaiting it, and that makes things cleaner. So let me know if that works

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new structure

return retry_wrapped_stream
else:
return retry_wrapped_func

def _replace(
self,
Expand Down
217 changes: 217 additions & 0 deletions google/api_core/retry_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for retries for streaming APIs."""

from typing import Callable, Optional, Iterable, Iterator, Generator, TypeVar, Any, cast

import datetime
import logging
import time

from google.api_core import datetime_helpers
from google.api_core import exceptions

_LOGGER = logging.getLogger(__name__)

T = TypeVar("T")


class RetryableGenerator(Generator[T, Any, None]):
"""
Helper class for retrying Iterator and Generator-based
streaming APIs.
"""

def __init__(
self,
target: Callable[[], Iterable[T]],
predicate: Callable[[Exception], bool],
sleep_generator: Iterable[float],
timeout: Optional[float] = None,
on_error: Optional[Callable[[Exception], None]] = None,
):
"""
Args:
target: The function to call to produce iterables for each retry.
This must be a nullary function - apply arguments with
`functools.partial`.
predicate: A callable used to determine if an
exception raised by the target should be considered retryable.
It should return True to retry or False otherwise.
sleep_generator: An infinite iterator that determines
how long to sleep between retries.
timeout: How long to keep retrying the target, in seconds.
on_error: A function to call while processing a
retryable exception. Any error raised by this function will *not*
be caught.
"""
self.target_fn = target
self.active_target: Iterator[T] = self.target_fn().__iter__()
self.predicate = predicate
self.sleep_generator = iter(sleep_generator)
self.on_error = on_error
self.timeout = timeout
if self.timeout is not None:
self.deadline = datetime_helpers.utcnow() + datetime.timedelta(
seconds=self.timeout
)
else:
self.deadline = None

def __iter__(self) -> Generator[T, Any, None]:
"""
Implement the iterator protocol.
"""
return self

def _handle_exception(self, exc) -> None:
"""
When an exception is raised while iterating over the active_target,
check if it is retryable. If so, create a new active_target and
continue iterating. If not, raise the exception.
"""
if not self.predicate(exc):
raise exc
else:
# run on_error callback if provided
if self.on_error:
self.on_error(exc)
try:
next_sleep = next(self.sleep_generator)
except StopIteration:
raise ValueError("Sleep generator stopped yielding sleep values")
# if deadline is exceeded, raise RetryError
if self.deadline is not None:
next_attempt = datetime_helpers.utcnow() + datetime.timedelta(
seconds=next_sleep
)
self._check_timeout(next_attempt, exc)
# sleep before retrying
_LOGGER.debug(
"Retrying due to {}, sleeping {:.1f}s ...".format(exc, next_sleep)
)
time.sleep(next_sleep)
self.active_target = self.target_fn().__iter__()

def _check_timeout(
self, current_time: float, source_exception: Optional[Exception] = None
) -> None:
"""
Helper function to check if the timeout has been exceeded, and raise a RetryError if so.

Args:
- current_time: the timestamp to check against the deadline
- source_exception: the exception that triggered the timeout check, if any
Raises:
- RetryError if the deadline has been exceeded
"""
if (
self.deadline is not None
and self.timeout is not None
and self.deadline < current_time
):
raise exceptions.RetryError(
"Timeout of {:.1f}s exceeded".format(self.timeout),
source_exception,
) from source_exception

def __next__(self) -> T:
"""
Implement the iterator protocol.

Returns:
- the next value of the active_target iterator
"""
# check for expired timeouts before attempting to iterate
self._check_timeout(datetime_helpers.utcnow())
try:
return next(self.active_target)
except Exception as exc:
daniel-sanche marked this conversation as resolved.
Show resolved Hide resolved
self._handle_exception(exc)
# if retryable exception was handled, try again with new active_target
return self.__next__()

def close(self) -> None:
"""
Close the active_target if supported. (e.g. target is a generator)

Raises:
- AttributeError if the active_target does not have a close() method
"""
if getattr(self.active_target, "close", None):
casted_target = cast(Generator, self.active_target)
return casted_target.close()
else:
raise AttributeError(
"close() not implemented for {}".format(self.active_target)
)

def send(self, *args, **kwargs) -> T:
"""
Call send on the active_target if supported. (e.g. target is a generator)

If an exception is raised, a retry may be attempted before returning
a result.

Args:
- *args: arguments to pass to the wrapped generator's send method
- **kwargs: keyword arguments to pass to the wrapped generator's send method
Returns:
- the next value of the active_target iterator after calling send
Raises:
- AttributeError if the active_target does not have a send() method
"""
# check for expired timeouts before attempting to iterate
self._check_timeout(datetime_helpers.utcnow())
if getattr(self.active_target, "send", None):
casted_target = cast(Generator, self.active_target)
try:
return casted_target.send(*args, **kwargs)
except Exception as exc:
self._handle_exception(exc)
# if exception was retryable, use new target for return value
return self.__next__()
else:
raise AttributeError(
"send() not implemented for {}".format(self.active_target)
)

def throw(self, *args, **kwargs) -> T:
"""
Call throw on the active_target if supported. (e.g. target is a generator)

If an exception is raised, a retry may be attempted before returning
a result.

Args:
- *args: arguments to pass to the wrapped generator's throw method
- **kwargs: keyword arguments to pass to the wrapped generator's throw method
Returns:
- the next vale of the active_target iterator after calling throw
Raises:
- AttributeError if the active_target does not have a throw() method
"""
if getattr(self.active_target, "throw", None):
casted_target = cast(Generator, self.active_target)
try:
return casted_target.throw(*args, **kwargs)
except Exception as exc:
self._handle_exception(exc)
# if retryable exception was handled, return next from new active_target
return self.__next__()
else:
raise AttributeError(
"throw() not implemented for {}".format(self.active_target)
)
Loading