diff --git a/google/cloud/bigtable/data/__init__.py b/google/cloud/bigtable/data/__init__.py index e08993108..15f9bc167 100644 --- a/google/cloud/bigtable/data/__init__.py +++ b/google/cloud/bigtable/data/__init__.py @@ -68,6 +68,8 @@ CrossSync._Sync_Impl.add_mapping("_ReadRowsOperation", _ReadRowsOperation) CrossSync.add_mapping("_MutateRowsOperation", _MutateRowsOperationAsync) CrossSync._Sync_Impl.add_mapping("_MutateRowsOperation", _MutateRowsOperation) +CrossSync.add_mapping("MutationsBatcher", MutationsBatcherAsync) +CrossSync._Sync_Impl.add_mapping("MutationsBatcher", MutationsBatcher) __version__: str = package_version.__version__ diff --git a/google/cloud/bigtable/data/_async/client.py b/google/cloud/bigtable/data/_async/client.py index 180b17079..2b83ef55b 100644 --- a/google/cloud/bigtable/data/_async/client.py +++ b/google/cloud/bigtable/data/_async/client.py @@ -83,31 +83,34 @@ from google.cloud.bigtable_v2.services.bigtable.transports import ( BigtableGrpcAsyncIOTransport as TransportType, ) - from google.cloud.bigtable.data._async.mutations_batcher import ( - MutationsBatcherAsync, - _MB_SIZE, - ) - from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import ( - ExecuteQueryIteratorAsync, - ) - + from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE else: from typing import Iterable # noqa: F401 from grpc import insecure_channel from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport as TransportType # type: ignore - from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( # noqa: F401 - MutationsBatcher, - _MB_SIZE, - ) - from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( # noqa: F401 - ExecuteQueryIterator, - ) + from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE if TYPE_CHECKING: from google.cloud.bigtable.data._helpers import RowKeySamples from google.cloud.bigtable.data._helpers import ShardedQuery + if CrossSync.is_async: + from google.cloud.bigtable.data._async.mutations_batcher import ( + MutationsBatcherAsync, + ) + from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import ( + ExecuteQueryIteratorAsync, + ) + else: + from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( # noqa: F401 + MutationsBatcher, + ) + from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( # noqa: F401 + ExecuteQueryIterator, + ) + + __CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.client" @@ -276,7 +279,9 @@ async def close(self, timeout: float | None = 2.0): @CrossSync.convert async def _ping_and_warm_instances( - self, instance_key: _WarmedInstanceKey | None = None, channel: Channel | None = None + self, + instance_key: _WarmedInstanceKey | None = None, + channel: Channel | None = None, ) -> list[BaseException | None]: """ Prepares the backend for requests on a channel @@ -366,8 +371,14 @@ async def _manage_channel( await self._ping_and_warm_instances(channel=new_channel) # cycle channel out of use, with long grace window before closure self.transport._grpc_channel = new_channel - await old_channel.close(grace_period) - # subtract the time spent waiting for the channel to be replaced + # give old_channel a chance to complete existing rpcs + if CrossSync.is_async: + await old_channel.close(grace_period) + else: + if grace_period: + self._is_closed.wait(grace_period) # type: ignore + old_channel.close() # type: ignore + # subtract thed time spent waiting for the channel to be replaced next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0) @@ -414,7 +425,7 @@ async def _register_instance( } ) async def _remove_instance_registration( - self, instance_id: str, owner: TableAsync | ExecuteQueryIteratorAsync + self, instance_id: str, owner: TableAsync | "ExecuteQueryIteratorAsync" ) -> bool: """ Removes an instance from the client's registered instances, to prevent @@ -578,7 +589,7 @@ async def execute_query( "proto_format": {}, } - return ExecuteQueryIteratorAsync( + return CrossSync.ExecuteQueryIterator( self, instance_id, app_profile_id, @@ -1125,7 +1136,7 @@ def mutations_batcher( batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, batch_retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, - ) -> MutationsBatcherAsync: + ) -> "MutationsBatcherAsync": """ Returns a new mutations batcher instance. diff --git a/google/cloud/bigtable/data/_async/mutations_batcher.py b/google/cloud/bigtable/data/_async/mutations_batcher.py index f59128b2e..2e935ca67 100644 --- a/google/cloud/bigtable/data/_async/mutations_batcher.py +++ b/google/cloud/bigtable/data/_async/mutations_batcher.py @@ -176,9 +176,7 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry] yield mutations[start_idx:end_idx] -@CrossSync.convert_class( - sync_name="MutationsBatcher", add_mapping_for_name="MutationsBatcher" -) +@CrossSync.convert_class(sync_name="MutationsBatcher") class MutationsBatcherAsync: """ Allows users to send batches using context manager API: diff --git a/google/cloud/bigtable/data/_sync_autogen/client.py b/google/cloud/bigtable/data/_sync_autogen/client.py index 3cc8cdbf2..af6449bbe 100644 --- a/google/cloud/bigtable/data/_sync_autogen/client.py +++ b/google/cloud/bigtable/data/_sync_autogen/client.py @@ -69,17 +69,17 @@ from google.cloud.bigtable_v2.services.bigtable.transports import ( BigtableGrpcTransport as TransportType, ) -from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( - MutationsBatcher, - _MB_SIZE, -) -from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( - ExecuteQueryIterator, -) +from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE if TYPE_CHECKING: from google.cloud.bigtable.data._helpers import RowKeySamples from google.cloud.bigtable.data._helpers import ShardedQuery + from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( + MutationsBatcher, + ) + from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( + ExecuteQueryIterator, + ) @CrossSync._Sync_Impl.add_mapping_decorator("DataClient") @@ -284,7 +284,9 @@ def _manage_channel( new_channel = self.transport.create_channel() self._ping_and_warm_instances(channel=new_channel) self.transport._grpc_channel = new_channel - old_channel.close(grace_period) + if grace_period: + self._is_closed.wait(grace_period) + old_channel.close() next_refresh = random.uniform(refresh_interval_min, refresh_interval_max) next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0) @@ -314,7 +316,7 @@ def _register_instance( self._start_background_channel_refresh() def _remove_instance_registration( - self, instance_id: str, owner: Table | ExecuteQueryIterator + self, instance_id: str, owner: Table | "ExecuteQueryIterator" ) -> bool: """Removes an instance from the client's registered instances, to prevent warming new channels for the instance @@ -452,7 +454,7 @@ def execute_query( "params": pb_params, "proto_format": {}, } - return ExecuteQueryIterator( + return CrossSync._Sync_Impl.ExecuteQueryIterator( self, instance_id, app_profile_id, @@ -945,7 +947,7 @@ def mutations_batcher( batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, batch_retryable_errors: Sequence[type[Exception]] | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS, - ) -> MutationsBatcher: + ) -> "MutationsBatcher": """Returns a new mutations batcher instance. Can be used to iteratively add mutations that are flushed as a group, diff --git a/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py b/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py index c4f47b41c..3c2bbfc7f 100644 --- a/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py +++ b/google/cloud/bigtable/data/_sync_autogen/mutations_batcher.py @@ -146,7 +146,6 @@ def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]): yield mutations[start_idx:end_idx] -@CrossSync._Sync_Impl.add_mapping_decorator("MutationsBatcher") class MutationsBatcher: """ Allows users to send batches using context manager API: diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 2e18d83ab..268e77c19 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -99,13 +99,11 @@ async def test_ctor(self): async def test_ctor_super_inits(self): from google.cloud.client import ClientWithProject from google.api_core import client_options as client_options_lib - from google.cloud.bigtable import __version__ as bigtable_version project = "project-id" credentials = AnonymousCredentials() client_options = {"api_endpoint": "foo.bar:1234"} options_parsed = client_options_lib.from_dict(client_options) - asyncio_portion = "-async" if CrossSync.is_async else "" with mock.patch.object( CrossSync.GapicClient, "__init__" ) as bigtable_client_init: @@ -213,15 +211,17 @@ async def test__start_background_channel_refresh_task_exists(self): @CrossSync.pytest async def test__start_background_channel_refresh(self): # should create background tasks for each channel - client = self._make_client(project="project-id", use_emulator=False) - ping_and_warm = CrossSync.Mock() - client._ping_and_warm_instances = ping_and_warm - client._start_background_channel_refresh() - assert client._channel_refresh_task is not None - assert isinstance(client._channel_refresh_task, asyncio.Task) - await asyncio.sleep(0.1) - assert ping_and_warm.call_count == 1 - await client.close() + client = self._make_client(project="project-id") + with mock.patch.object( + client, "_ping_and_warm_instances", CrossSync.Mock() + ) as ping_and_warm: + client._emulator_host = None + client._start_background_channel_refresh() + assert client._channel_refresh_task is not None + assert isinstance(client._channel_refresh_task, CrossSync.Task) + await CrossSync.sleep(0.1) + assert ping_and_warm.call_count == 1 + await client.close() @CrossSync.drop @CrossSync.pytest @@ -310,7 +310,6 @@ async def test__ping_and_warm_single_instance(self): CrossSync, "gather_partials", CrossSync.Mock() ) as gather: gather.side_effect = lambda *args, **kwargs: [fn() for fn in args[0]] - channel = mock.Mock() # test with large set of instances client_mock._active_instances = [mock.Mock()] * 100 test_key = ("test-instance", "test-table", "test-app-profile") @@ -419,7 +418,6 @@ async def test__manage_channel_sleeps( # make sure that sleeps work as expected import time import random - import threading channel = mock.Mock() channel.close = CrossSync.Mock() @@ -427,12 +425,7 @@ async def test__manage_channel_sleeps( uniform.side_effect = lambda min_, max_: min_ with mock.patch.object(time, "time") as time_mock: time_mock.return_value = 0 - sleep_tuple = ( - (asyncio, "sleep") - if CrossSync.is_async - else (threading.Event, "wait") - ) - with mock.patch.object(*sleep_tuple) as sleep: + with mock.patch.object(CrossSync, "event_wait") as sleep: sleep.side_effect = [None for i in range(num_cycles - 1)] + [ asyncio.CancelledError ] @@ -441,19 +434,14 @@ async def test__manage_channel_sleeps( try: if refresh_interval is not None: await client._manage_channel( - refresh_interval, refresh_interval + refresh_interval, refresh_interval, grace_period=0 ) else: - await client._manage_channel() + await client._manage_channel(grace_period=0) except asyncio.CancelledError: pass assert sleep.call_count == num_cycles - if CrossSync.is_async: - total_sleep = sum([call[0][0] for call in sleep.call_args_list]) - else: - total_sleep = sum( - [call[1]["timeout"] for call in sleep.call_args_list] - ) + total_sleep = sum([call[0][1] for call in sleep.call_args_list]) assert ( abs(total_sleep - expected_sleep) < 0.1 ), f"refresh_interval={refresh_interval}, num_cycles={num_cycles}, expected_sleep={expected_sleep}" @@ -462,12 +450,8 @@ async def test__manage_channel_sleeps( @CrossSync.pytest async def test__manage_channel_random(self): import random - import threading - sleep_tuple = ( - (asyncio, "sleep") if CrossSync.is_async else (threading.Event, "wait") - ) - with mock.patch.object(*sleep_tuple) as sleep: + with mock.patch.object(CrossSync, "event_wait") as sleep: with mock.patch.object(random, "uniform") as uniform: uniform.return_value = 0 try: @@ -483,7 +467,7 @@ async def test__manage_channel_random(self): uniform.side_effect = lambda min_, max_: min_ sleep.side_effect = [None, asyncio.CancelledError] try: - await client._manage_channel(min_val, max_val) + await client._manage_channel(min_val, max_val, grace_period=0) except asyncio.CancelledError: pass assert uniform.call_count == 2 @@ -496,28 +480,25 @@ async def test__manage_channel_random(self): @pytest.mark.parametrize("num_cycles", [0, 1, 10, 100]) async def test__manage_channel_refresh(self, num_cycles): # make sure that channels are properly refreshed - expected_grace = 9 expected_refresh = 0.5 grpc_lib = grpc.aio if CrossSync.is_async else grpc new_channel = grpc_lib.insecure_channel("localhost:8080") with mock.patch.object(CrossSync, "event_wait") as sleep: - sleep.side_effect = [None for i in range(num_cycles)] + [ - asyncio.CancelledError - ] + sleep.side_effect = [None for i in range(num_cycles)] + [RuntimeError] with mock.patch.object( CrossSync.grpc_helpers, "create_channel" ) as create_channel: create_channel.return_value = new_channel - client = self._make_client(project="project-id", use_emulator=False) + client = self._make_client(project="project-id") create_channel.reset_mock() try: await client._manage_channel( refresh_interval_min=expected_refresh, refresh_interval_max=expected_refresh, - grace_period=expected_grace, + grace_period=0, ) - except asyncio.CancelledError: + except RuntimeError: pass assert sleep.call_count == num_cycles + 1 assert create_channel.call_count == num_cycles @@ -932,12 +913,14 @@ async def test_close(self): task = client._channel_refresh_task assert task is not None assert not task.done() - with mock.patch.object(client.transport, "close", CrossSync.Mock()) as close_mock: + with mock.patch.object( + client.transport, "close", CrossSync.Mock() + ) as close_mock: await client.close() close_mock.assert_called_once() - close_mock.assert_awaited() + if CrossSync.is_async: + close_mock.assert_awaited() assert task.done() - assert task.cancelled() assert client._channel_refresh_task is None @CrossSync.pytest @@ -954,11 +937,16 @@ async def test_close_with_timeout(self): @CrossSync.pytest async def test_context_manager(self): + from functools import partial + # context manager should close the client cleanly close_mock = CrossSync.Mock() true_close = None - async with self._make_client(project="project-id", use_emulator=False) as client: - true_close = client.close() + async with self._make_client( + project="project-id", use_emulator=False + ) as client: + # grab reference to close coro for async test + true_close = partial(client.close) client.close = close_mock assert not client._channel_refresh_task.done() assert client.project == "project-id" @@ -968,7 +956,7 @@ async def test_context_manager(self): if CrossSync.is_async: close_mock.assert_awaited() # actually close the client - await true_close + await true_close() @CrossSync.drop def test_client_ctor_sync(self): @@ -1267,16 +1255,18 @@ async def test_customizable_retryable_errors( @CrossSync.pytest @CrossSync.convert async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): - from google.cloud.bigtable.data import TableAsync - profile = "profile" if include_app_profile else None client = self._make_client() # create mock for rpc stub transport_mock = mock.MagicMock() rpc_mock = CrossSync.Mock() transport_mock._wrapped_methods.__getitem__.return_value = rpc_mock - client._gapic_client._client._transport = transport_mock - client._gapic_client._client._is_universe_domain_valid = True + gapic_client = client._gapic_client + if CrossSync.is_async: + # inner BigtableClient is held as ._client for BigtableAsyncClient + gapic_client = gapic_client._client + gapic_client._transport = transport_mock + gapic_client._is_universe_domain_valid = True table = self._get_target_class()(client, "instance-id", "table-id", profile) try: test_fn = table.__getattribute__(fn_name) @@ -1299,6 +1289,7 @@ async def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_ else: assert "app_profile_id=" not in routing_str + @CrossSync.convert_class( "TestReadRows", add_mapping_for_name="TestReadRows", diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index 7d685fdf0..c0645adb4 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -80,7 +80,6 @@ def test_ctor_super_inits(self): credentials = AnonymousCredentials() client_options = {"api_endpoint": "foo.bar:1234"} options_parsed = client_options_lib.from_dict(client_options) - asyncio_portion = "-async" if CrossSync._Sync_Impl.is_async else "" with mock.patch.object( CrossSync._Sync_Impl.GapicClient, "__init__" ) as bigtable_client_init: @@ -165,15 +164,17 @@ def test__start_background_channel_refresh_task_exists(self): client.close() def test__start_background_channel_refresh(self): - client = self._make_client(project="project-id", use_emulator=False) - ping_and_warm = CrossSync._Sync_Impl.Mock() - client._ping_and_warm_instances = ping_and_warm - client._start_background_channel_refresh() - assert client._channel_refresh_task is not None - assert isinstance(client._channel_refresh_task, asyncio.Task) - asyncio.sleep(0.1) - assert ping_and_warm.call_count == 1 - client.close() + client = self._make_client(project="project-id") + with mock.patch.object( + client, "_ping_and_warm_instances", CrossSync._Sync_Impl.Mock() + ) as ping_and_warm: + client._emulator_host = None + client._start_background_channel_refresh() + assert client._channel_refresh_task is not None + assert isinstance(client._channel_refresh_task, CrossSync._Sync_Impl.Task) + CrossSync._Sync_Impl.sleep(0.1) + assert ping_and_warm.call_count == 1 + client.close() def test__ping_and_warm_instances(self): """test ping and warm with mocked asyncio.gather""" @@ -237,7 +238,6 @@ def test__ping_and_warm_single_instance(self): CrossSync._Sync_Impl, "gather_partials", CrossSync._Sync_Impl.Mock() ) as gather: gather.side_effect = lambda *args, **kwargs: [fn() for fn in args[0]] - channel = mock.Mock() client_mock._active_instances = [mock.Mock()] * 100 test_key = ("test-instance", "test-table", "test-app-profile") result = self._get_target_class()._ping_and_warm_instances( @@ -321,41 +321,30 @@ def test__manage_channel_ping_and_warm(self): def test__manage_channel_sleeps(self, refresh_interval, num_cycles, expected_sleep): import time import random - import threading channel = mock.Mock() - channel.close = mock.AsyncMock() + channel.close = CrossSync._Sync_Impl.Mock() with mock.patch.object(random, "uniform") as uniform: uniform.side_effect = lambda min_, max_: min_ with mock.patch.object(time, "time") as time_mock: time_mock.return_value = 0 - sleep_tuple = ( - (asyncio, "sleep") - if CrossSync._Sync_Impl.is_async - else (threading.Event, "wait") - ) - with mock.patch.object(*sleep_tuple) as sleep: + with mock.patch.object(CrossSync._Sync_Impl, "event_wait") as sleep: sleep.side_effect = [None for i in range(num_cycles - 1)] + [ asyncio.CancelledError ] client = self._make_client(project="project-id") client.transport._grpc_channel = channel - with mock.patch.object( - client.transport, "replace_channel", return_value=channel - ): - try: - if refresh_interval is not None: - client._manage_channel( - refresh_interval, refresh_interval - ) - else: - client._manage_channel() - except asyncio.CancelledError: - pass + try: + if refresh_interval is not None: + client._manage_channel( + refresh_interval, refresh_interval, grace_period=0 + ) + else: + client._manage_channel(grace_period=0) + except asyncio.CancelledError: + pass assert sleep.call_count == num_cycles - total_sleep = sum( - [call[1]["timeout"] for call in sleep.call_args_list] - ) + total_sleep = sum([call[0][1] for call in sleep.call_args_list]) assert ( abs(total_sleep - expected_sleep) < 0.1 ), f"refresh_interval={refresh_interval}, num_cycles={num_cycles}, expected_sleep={expected_sleep}" @@ -363,14 +352,8 @@ def test__manage_channel_sleeps(self, refresh_interval, num_cycles, expected_sle def test__manage_channel_random(self): import random - import threading - sleep_tuple = ( - (asyncio, "sleep") - if CrossSync._Sync_Impl.is_async - else (threading.Event, "wait") - ) - with mock.patch.object(*sleep_tuple) as sleep: + with mock.patch.object(CrossSync._Sync_Impl, "event_wait") as sleep: with mock.patch.object(random, "uniform") as uniform: uniform.return_value = 0 try: @@ -386,7 +369,7 @@ def test__manage_channel_random(self): uniform.side_effect = lambda min_, max_: min_ sleep.side_effect = [None, asyncio.CancelledError] try: - client._manage_channel(min_val, max_val) + client._manage_channel(min_val, max_val, grace_period=0) except asyncio.CancelledError: pass assert uniform.call_count == 2 @@ -397,27 +380,24 @@ def test__manage_channel_random(self): @pytest.mark.parametrize("num_cycles", [0, 1, 10, 100]) def test__manage_channel_refresh(self, num_cycles): - expected_grace = 9 expected_refresh = 0.5 grpc_lib = grpc.aio if CrossSync._Sync_Impl.is_async else grpc new_channel = grpc_lib.insecure_channel("localhost:8080") with mock.patch.object(CrossSync._Sync_Impl, "event_wait") as sleep: - sleep.side_effect = [None for i in range(num_cycles)] + [ - asyncio.CancelledError - ] + sleep.side_effect = [None for i in range(num_cycles)] + [RuntimeError] with mock.patch.object( CrossSync._Sync_Impl.grpc_helpers, "create_channel" ) as create_channel: create_channel.return_value = new_channel - client = self._make_client(project="project-id", use_emulator=False) + client = self._make_client(project="project-id") create_channel.reset_mock() try: client._manage_channel( refresh_interval_min=expected_refresh, refresh_interval_max=expected_refresh, - grace_period=expected_grace, + grace_period=0, ) - except asyncio.CancelledError: + except RuntimeError: pass assert sleep.call_count == num_cycles + 1 assert create_channel.call_count == num_cycles @@ -784,9 +764,7 @@ def test_close(self): ) as close_mock: client.close() close_mock.assert_called_once() - close_mock.assert_awaited() assert task.done() - assert task.cancelled() assert client._channel_refresh_task is None def test_close_with_timeout(self): @@ -801,17 +779,19 @@ def test_close_with_timeout(self): client.close() def test_context_manager(self): + from functools import partial + close_mock = CrossSync._Sync_Impl.Mock() true_close = None with self._make_client(project="project-id", use_emulator=False) as client: - true_close = client.close() + true_close = partial(client.close) client.close = close_mock assert not client._channel_refresh_task.done() assert client.project == "project-id" assert client._active_instances == set() close_mock.assert_not_called() close_mock.assert_called_once() - true_close + true_close() @CrossSync._Sync_Impl.add_mapping_decorator("TestTable") @@ -1032,10 +1012,11 @@ def test_call_metadata(self, include_app_profile, fn_name, fn_args, gapic_fn): profile = "profile" if include_app_profile else None client = self._make_client() transport_mock = mock.MagicMock() - rpc_mock = mock.AsyncMock() + rpc_mock = CrossSync._Sync_Impl.Mock() transport_mock._wrapped_methods.__getitem__.return_value = rpc_mock - client._gapic_client._client._transport = transport_mock - client._gapic_client._client._is_universe_domain_valid = True + gapic_client = client._gapic_client + gapic_client._transport = transport_mock + gapic_client._is_universe_domain_valid = True table = self._get_target_class()(client, "instance-id", "table-id", profile) try: test_fn = table.__getattribute__(fn_name)