Skip to content

Commit

Permalink
Merge branch 'main' into parthea-patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
ohmayr authored Oct 21, 2024
2 parents 158ed95 + 1695119 commit cf04598
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 53 deletions.
80 changes: 36 additions & 44 deletions google/api_core/gapic_v1/method.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,6 @@ class _MethodDefault(enum.Enum):
so the default should be used."""


def _is_not_none_or_false(value):
return value is not None and value is not False


def _apply_decorators(func, decorators):
"""Apply a list of decorators to a given function.
``decorators`` may contain items that are ``None`` or ``False`` which will
be ignored.
"""
filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))

for decorator in filtered_decorators:
func = decorator(func)

return func


class _GapicCallable(object):
"""Callable that applies retry, timeout, and metadata logic.
Expand Down Expand Up @@ -91,44 +73,53 @@ def __init__(
):
self._target = target
self._retry = retry
if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)
self._timeout = timeout
self._compression = compression
self._metadata = metadata
self._metadata = list(metadata) if metadata is not None else None

def __call__(
self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
):
"""Invoke the low-level RPC with retry, timeout, compression, and metadata."""

if retry is DEFAULT:
retry = self._retry

if timeout is DEFAULT:
timeout = self._timeout

if compression is DEFAULT:
compression = self._compression

if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)

# Apply all applicable decorators.
wrapped_func = _apply_decorators(self._target, [retry, timeout])
if compression is not None:
kwargs["compression"] = compression

# Add the user agent metadata to the call.
if self._metadata is not None:
metadata = kwargs.get("metadata", [])
# Due to the nature of invocation, None should be treated the same
# as not specified.
if metadata is None:
metadata = []
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
if self._compression is not None:
kwargs["compression"] = compression
try:
# attempt to concatenate default metadata with user-provided metadata
kwargs["metadata"] = [*kwargs["metadata"], *self._metadata]
except (KeyError, TypeError):
# if metadata is not provided, use just the default metadata
kwargs["metadata"] = self._metadata

call = self._build_wrapped_call(timeout, retry)
return call(*args, **kwargs)

@functools.lru_cache(maxsize=4)
def _build_wrapped_call(self, timeout, retry):
"""
Build a wrapped callable that applies retry, timeout, and metadata logic.
"""
wrapped_func = self._target
if timeout is DEFAULT:
timeout = self._timeout
elif isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)
if timeout is not None:
wrapped_func = timeout(wrapped_func)

if retry is DEFAULT:
retry = self._retry
if retry is not None:
wrapped_func = retry(wrapped_func)

return wrapped_func(*args, **kwargs)
return wrapped_func


def wrap_method(
Expand Down Expand Up @@ -202,8 +193,9 @@ def get_topic(name, timeout=None):
Args:
func (Callable[Any]): The function to wrap. It should accept an
optional ``timeout`` argument. If ``metadata`` is not ``None``, it
should accept a ``metadata`` argument.
optional ``timeout`` (google.api_core.timeout.Timeout) argument.
If ``metadata`` is not ``None``, it should accept a ``metadata``
(Sequence[Tuple[str, str]]) argument.
default_retry (Optional[google.api_core.Retry]): The default retry
strategy. If ``None``, the method will not retry by default.
default_timeout (Optional[google.api_core.Timeout]): The default
Expand Down
7 changes: 3 additions & 4 deletions google/api_core/retry/retry_unary.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ def retry_target(

for sleep in sleep_generator:
try:
result = target()
if inspect.isawaitable(result):
warnings.warn(_ASYNC_RETRY_WARNING)
return result
return target()

# pylint: disable=broad-except
# This function explicitly must deal with broad exceptions.
Expand Down Expand Up @@ -280,6 +277,8 @@ def __call__(
Callable: A callable that will invoke ``func`` with retry
behavior.
"""
if inspect.iscoroutinefunction(func):
warnings.warn(_ASYNC_RETRY_WARNING)
if self._on_error is not None:
on_error = self._on_error

Expand Down
21 changes: 21 additions & 0 deletions tests/unit/gapic/test_method.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,24 @@ def test_wrap_method_with_call_not_supported():
with pytest.raises(ValueError) as exc_info:
google.api_core.gapic_v1.method.wrap_method(method, with_call=True)
assert "with_call=True is only supported for unary calls" in str(exc_info.value)


def test_benchmark_gapic_call():
"""
Ensure the __call__ method performance does not regress from expectations
__call__ builds a new wrapped function using passed-in timeout and retry, but
subsequent calls are cached
Note: The threshold has been tuned for the CI workers. Test may flake on
slower hardware
"""
from google.api_core.gapic_v1.method import _GapicCallable
from google.api_core.retry import Retry
from timeit import timeit

gapic_callable = _GapicCallable(
lambda *a, **k: 1, retry=Retry(), timeout=1010, compression=False
)
avg_time = timeit(lambda: gapic_callable(), number=10_000)
assert avg_time < 0.4
16 changes: 11 additions & 5 deletions tests/unit/retry/test_retry_unary.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,20 @@ def test_retry_target_non_retryable_error(utcnow, sleep):
)
@pytest.mark.asyncio
async def test_retry_target_warning_for_retry(utcnow, sleep):
predicate = retry.if_exception_type(ValueError)
target = mock.AsyncMock(spec=["__call__"])
"""
retry.Retry should raise warning when wrapping an async function.
"""

async def target():
pass # pragma: NO COVER

retry_obj = retry.Retry()

with pytest.warns(Warning) as exc_info:
# Note: predicate is just a filler and doesn't affect the test
retry.retry_target(target, predicate, range(10), None)
# raise warning when wrapping an async function
retry_obj(target)

assert len(exc_info) == 2
assert len(exc_info) == 1
assert str(exc_info[0].message) == retry.retry_unary._ASYNC_RETRY_WARNING
sleep.assert_not_called()

Expand Down

0 comments on commit cf04598

Please sign in to comment.