Skip to content

Commit

Permalink
chore: remove pooled transport (#1035)
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche authored Nov 8, 2024
1 parent 0f63a74 commit 8b8a544
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 807 deletions.
1 change: 0 additions & 1 deletion gapic-generator-fork
Submodule gapic-generator-fork deleted from b26cda
113 changes: 49 additions & 64 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@
DEFAULT_CLIENT_INFO,
BigtableAsyncClient,
)
from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
PooledBigtableGrpcAsyncIOTransport,
PooledChannel,
from google.cloud.bigtable_v2.services.bigtable.transports import (
BigtableGrpcAsyncIOTransport,
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest

Expand All @@ -103,11 +101,11 @@ def __init__(
self,
*,
project: str | None = None,
pool_size: int = 3,
credentials: google.auth.credentials.Credentials | None = None,
client_options: dict[str, Any]
| "google.api_core.client_options.ClientOptions"
| None = None,
**kwargs,
):
"""
Create a client instance for the Bigtable Data API
Expand All @@ -118,8 +116,6 @@ def __init__(
project: the project which the client acts on behalf of.
If not passed, falls back to the default inferred
from the environment.
pool_size: The number of grpc channels to maintain
in the internal channel pool.
credentials:
Thehe OAuth2 Credentials to use for this
client. If not passed (and if no ``_http`` object is
Expand All @@ -130,12 +126,9 @@ def __init__(
on the client. API Endpoint should be set through client_options.
Raises:
RuntimeError: if called outside of an async context (no running event loop)
ValueError: if pool_size is less than 1
"""
# set up transport in registry
transport_str = f"pooled_grpc_asyncio_{pool_size}"
transport = PooledBigtableGrpcAsyncIOTransport.with_fixed_size(pool_size)
BigtableClientMeta._transport_registry[transport_str] = transport
if "pool_size" in kwargs:
warnings.warn("pool_size no longer supported")
# set up client info headers for veneer library
client_info = DEFAULT_CLIENT_INFO
client_info.client_library_version = self._client_version()
Expand All @@ -145,9 +138,16 @@ def __init__(
client_options = cast(
Optional[client_options_lib.ClientOptions], client_options
)
custom_channel = None
self._emulator_host = os.getenv(BIGTABLE_EMULATOR)
if self._emulator_host is not None:
warnings.warn(
"Connecting to Bigtable emulator at {}".format(self._emulator_host),
RuntimeWarning,
stacklevel=2,
)
# use insecure channel if emulator is set
custom_channel = grpc.aio.insecure_channel(self._emulator_host)
if credentials is None:
credentials = google.auth.credentials.AnonymousCredentials()
if project is None:
Expand All @@ -160,37 +160,24 @@ def __init__(
client_options=client_options,
)
self._gapic_client = BigtableAsyncClient(
transport=transport_str,
credentials=credentials,
client_options=client_options,
client_info=client_info,
transport=lambda *args, **kwargs: BigtableGrpcAsyncIOTransport(
*args, **kwargs, channel=custom_channel
),
)
self.transport = cast(
PooledBigtableGrpcAsyncIOTransport, self._gapic_client.transport
BigtableGrpcAsyncIOTransport, self._gapic_client.transport
)
# keep track of active instances to for warmup on channel refresh
self._active_instances: Set[_WarmedInstanceKey] = set()
# keep track of table objects associated with each instance
# only remove instance from _active_instances when all associated tables remove it
self._instance_owners: dict[_WarmedInstanceKey, Set[int]] = {}
self._channel_init_time = time.monotonic()
self._channel_refresh_tasks: list[asyncio.Task[None]] = []
if self._emulator_host is not None:
# connect to an emulator host
warnings.warn(
"Connecting to Bigtable emulator at {}".format(self._emulator_host),
RuntimeWarning,
stacklevel=2,
)
self.transport._grpc_channel = PooledChannel(
pool_size=pool_size,
host=self._emulator_host,
insecure=True,
)
# refresh cached stubs to use emulator pool
self.transport._stubs = {}
self.transport._prep_wrapped_messages(client_info)
else:
self._channel_refresh_task: asyncio.Task[None] | None = None
if self._emulator_host is None:
# attempt to start background channel refresh tasks
try:
self._start_background_channel_refresh()
Expand All @@ -211,48 +198,51 @@ def _client_version() -> str:

def _start_background_channel_refresh(self) -> None:
"""
Starts a background task to ping and warm each channel in the pool
Starts a background task to ping and warm grpc channel
Raises:
RuntimeError: if not called in an asyncio event loop
"""
if not self._channel_refresh_tasks and not self._emulator_host:
if not self._channel_refresh_task and not self._emulator_host:
# raise RuntimeError if there is no event loop
asyncio.get_running_loop()
for channel_idx in range(self.transport.pool_size):
refresh_task = asyncio.create_task(self._manage_channel(channel_idx))
if sys.version_info >= (3, 8):
# task names supported in Python 3.8+
refresh_task.set_name(
f"{self.__class__.__name__} channel refresh {channel_idx}"
)
self._channel_refresh_tasks.append(refresh_task)
self._channel_refresh_task = asyncio.create_task(self._manage_channel())
if sys.version_info >= (3, 8):
# task names supported in Python 3.8+
self._channel_refresh_task.set_name(
f"{self.__class__.__name__} channel refresh"
)

async def close(self, timeout: float = 2.0):
"""
Cancel all background tasks
"""
for task in self._channel_refresh_tasks:
task.cancel()
group = asyncio.gather(*self._channel_refresh_tasks, return_exceptions=True)
await asyncio.wait_for(group, timeout=timeout)
if self._channel_refresh_task:
self._channel_refresh_task.cancel()
try:
await asyncio.wait_for(self._channel_refresh_task, timeout=timeout)
except asyncio.CancelledError:
pass
await self.transport.close()
self._channel_refresh_tasks = []
self._channel_refresh_task = None

async def _ping_and_warm_instances(
self, channel: grpc.aio.Channel, instance_key: _WarmedInstanceKey | None = None
self,
instance_key: _WarmedInstanceKey | None = None,
channel: grpc.aio.Channel | None = None,
) -> list[BaseException | None]:
"""
Prepares the backend for requests on a channel
Pings each Bigtable instance registered in `_active_instances` on the client
Args:
channel: grpc channel to warm
instance_key: if provided, only warm the instance associated with the key
channel: grpc channel to warm. If none, warms `self.transport.grpc_channel`
Returns:
list[BaseException | None]: sequence of results or exceptions from the ping requests
"""
channel = channel or self.transport.grpc_channel
instance_list = (
[instance_key] if instance_key is not None else self._active_instances
)
Expand Down Expand Up @@ -280,7 +270,6 @@ async def _ping_and_warm_instances(

async def _manage_channel(
self,
channel_idx: int,
refresh_interval_min: float = 60 * 35,
refresh_interval_max: float = 60 * 45,
grace_period: float = 60 * 10,
Expand All @@ -294,7 +283,6 @@ async def _manage_channel(
Runs continuously until the client is closed
Args:
channel_idx: index of the channel in the transport's channel pool
refresh_interval_min: minimum interval before initiating refresh
process in seconds. Actual interval will be a random value
between `refresh_interval_min` and `refresh_interval_max`
Expand All @@ -310,19 +298,18 @@ async def _manage_channel(
next_sleep = max(first_refresh - time.monotonic(), 0)
if next_sleep > 0:
# warm the current channel immediately
channel = self.transport.channels[channel_idx]
await self._ping_and_warm_instances(channel)
await self._ping_and_warm_instances(channel=self.transport.grpc_channel)
# continuously refresh the channel every `refresh_interval` seconds
while True:
await asyncio.sleep(next_sleep)
start_timestamp = time.time()
# prepare new channel for use
new_channel = self.transport.grpc_channel._create_channel()
await self._ping_and_warm_instances(new_channel)
old_channel = self.transport.grpc_channel
new_channel = self.transport.create_channel()
await self._ping_and_warm_instances(channel=new_channel)
# cycle channel out of use, with long grace window before closure
start_timestamp = time.time()
await self.transport.replace_channel(
channel_idx, grace=grace_period, swap_sleep=10, new_channel=new_channel
)
self.transport._grpc_channel = new_channel
await old_channel.close(grace_period)
# subtract the time spent waiting for the channel to be replaced
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
next_sleep = next_refresh - (time.time() - start_timestamp)
Expand All @@ -331,9 +318,8 @@ async def _register_instance(
self, instance_id: str, owner: Union[TableAsync, ExecuteQueryIteratorAsync]
) -> None:
"""
Registers an instance with the client, and warms the channel pool
for the instance
The client will periodically refresh grpc channel pool used to make
Registers an instance with the client, and warms the channel for the instance
The client will periodically refresh grpc channel used to make
requests, and new channels will be warmed for each registered instance
Channels will not be refreshed unless at least one instance is registered
Expand All @@ -350,11 +336,10 @@ async def _register_instance(
self._instance_owners.setdefault(instance_key, set()).add(id(owner))
if instance_key not in self._active_instances:
self._active_instances.add(instance_key)
if self._channel_refresh_tasks:
if self._channel_refresh_task:
# refresh tasks already running
# call ping and warm on all existing channels
for channel in self.transport.channels:
await self._ping_and_warm_instances(channel, instance_key)
await self._ping_and_warm_instances(instance_key)
else:
# refresh tasks aren't active. start them as background tasks
self._start_background_channel_refresh()
Expand Down
2 changes: 0 additions & 2 deletions google/cloud/bigtable_v2/services/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
from .transports.base import BigtableTransport, DEFAULT_CLIENT_INFO
from .transports.grpc import BigtableGrpcTransport
from .transports.grpc_asyncio import BigtableGrpcAsyncIOTransport
from .transports.pooled_grpc_asyncio import PooledBigtableGrpcAsyncIOTransport
from .transports.rest import BigtableRestTransport


Expand All @@ -70,7 +69,6 @@ class BigtableClientMeta(type):
_transport_registry = OrderedDict() # type: Dict[str, Type[BigtableTransport]]
_transport_registry["grpc"] = BigtableGrpcTransport
_transport_registry["grpc_asyncio"] = BigtableGrpcAsyncIOTransport
_transport_registry["pooled_grpc_asyncio"] = PooledBigtableGrpcAsyncIOTransport
_transport_registry["rest"] = BigtableRestTransport

def get_transport_class(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from .base import BigtableTransport
from .grpc import BigtableGrpcTransport
from .grpc_asyncio import BigtableGrpcAsyncIOTransport
from .pooled_grpc_asyncio import PooledBigtableGrpcAsyncIOTransport
from .rest import BigtableRestTransport
from .rest import BigtableRestInterceptor

Expand All @@ -28,14 +27,12 @@
_transport_registry = OrderedDict() # type: Dict[str, Type[BigtableTransport]]
_transport_registry["grpc"] = BigtableGrpcTransport
_transport_registry["grpc_asyncio"] = BigtableGrpcAsyncIOTransport
_transport_registry["pooled_grpc_asyncio"] = PooledBigtableGrpcAsyncIOTransport
_transport_registry["rest"] = BigtableRestTransport

__all__ = (
"BigtableTransport",
"BigtableGrpcTransport",
"BigtableGrpcAsyncIOTransport",
"PooledBigtableGrpcAsyncIOTransport",
"BigtableRestTransport",
"BigtableRestInterceptor",
)
Loading

0 comments on commit 8b8a544

Please sign in to comment.