Skip to content

Commit

Permalink
feat: async execute query client (#1011)
Browse files Browse the repository at this point in the history
Co-authored-by: Mateusz Walkiewicz <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent abe67c4 commit 45bc8c4
Show file tree
Hide file tree
Showing 39 changed files with 3,855 additions and 78 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.cloud.bigtable.data.exceptions import ParameterTypeInferenceFailed

from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import RowKeySamples
Expand Down Expand Up @@ -68,6 +69,7 @@
"RetryExceptionGroup",
"MutationsExceptionGroup",
"ShardedReadRowsExceptionGroup",
"ParameterTypeInferenceFailed",
"ShardedQuery",
"TABLE_DEFAULT",
)
4 changes: 3 additions & 1 deletion google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ def __init__(
f"all entries. Found {total_mutations}."
)
# create partial function to pass to trigger rpc call
metadata = _make_metadata(table.table_name, table.app_profile_id)
metadata = _make_metadata(
table.table_name, table.app_profile_id, instance_name=None
)
self._gapic_fn = functools.partial(
gapic_client.mutate_rows,
table_name=table.table_name,
Expand Down
3 changes: 1 addition & 2 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ def __init__(
self.table = table
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._metadata = _make_metadata(
table.table_name,
table.app_profile_id,
table.table_name, table.app_profile_id, instance_name=None
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None
Expand Down
234 changes: 177 additions & 57 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,74 +15,88 @@

from __future__ import annotations

import asyncio
from functools import partial
import os
import random
import sys
import time
from typing import (
cast,
TYPE_CHECKING,
Any,
AsyncIterable,
Dict,
Optional,
Set,
Sequence,
TYPE_CHECKING,
Set,
Union,
cast,
)

import asyncio
import grpc
import time
import warnings
import sys
import random
import os

from functools import partial
from google.api_core import client_options as client_options_lib
from google.api_core import retry as retries
from google.api_core.exceptions import Aborted, DeadlineExceeded, ServiceUnavailable
import google.auth._default
import google.auth.credentials
from google.cloud.client import ClientWithProject
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
import grpc

from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import (
ExecuteQueryIteratorAsync,
)
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync
from google.cloud.bigtable.data._async.mutations_batcher import (
_MB_SIZE,
MutationsBatcherAsync,
)
from google.cloud.bigtable.data._helpers import (
_CONCURRENCY_LIMIT,
TABLE_DEFAULT,
_attempt_timeout_generator,
_get_error_type,
_get_retryable_errors,
_get_timeouts,
_make_metadata,
_retry_exception_factory,
_validate_timeouts,
_WarmedInstanceKey,
)
from google.cloud.bigtable.data.exceptions import (
FailedQueryShardError,
ShardedReadRowsExceptionGroup,
)
from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry
from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.row import Row
from google.cloud.bigtable.data.row_filters import (
CellsRowLimitFilter,
RowFilter,
RowFilterChain,
StripValueTransformerFilter,
)
from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
from google.cloud.bigtable.data.execute_query.metadata import SqlType
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
_format_execute_query_params,
)
from google.cloud.bigtable_v2.services.bigtable.async_client import (
DEFAULT_CLIENT_INFO,
BigtableAsyncClient,
)
from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO
from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import (
PooledBigtableGrpcAsyncIOTransport,
PooledChannel,
)
from google.cloud.bigtable_v2.types.bigtable import PingAndWarmRequest
from google.cloud.client import ClientWithProject
from google.cloud.environment_vars import BIGTABLE_EMULATOR # type: ignore
from google.api_core import retry as retries
from google.api_core.exceptions import DeadlineExceeded
from google.api_core.exceptions import ServiceUnavailable
from google.api_core.exceptions import Aborted
from google.cloud.bigtable.data._async._read_rows import _ReadRowsOperationAsync

import google.auth.credentials
import google.auth._default
from google.api_core import client_options as client_options_lib
from google.cloud.bigtable.client import _DEFAULT_BIGTABLE_EMULATOR_CLIENT
from google.cloud.bigtable.data.row import Row
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup

from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_retryable_errors
from google.cloud.bigtable.data._helpers import _get_timeouts
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync
from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE
from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule
from google.cloud.bigtable.data.row_filters import RowFilter
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain


if TYPE_CHECKING:
from google.cloud.bigtable.data._helpers import RowKeySamples
from google.cloud.bigtable.data._helpers import ShardedQuery
from google.cloud.bigtable.data._helpers import RowKeySamples, ShardedQuery


class BigtableDataClientAsync(ClientWithProject):
Expand Down Expand Up @@ -315,7 +329,9 @@ async def _manage_channel(
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
next_sleep = next_refresh - (time.time() - start_timestamp)

async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
async def _register_instance(
self, instance_id: str, owner: Union[TableAsync, ExecuteQueryIteratorAsync]
) -> None:
"""
Registers an instance with the client, and warms the channel pool
for the instance
Expand Down Expand Up @@ -346,7 +362,7 @@ async def _register_instance(self, instance_id: str, owner: TableAsync) -> None:
self._start_background_channel_refresh()

async def _remove_instance_registration(
self, instance_id: str, owner: TableAsync
self, instance_id: str, owner: Union[TableAsync, ExecuteQueryIteratorAsync]
) -> bool:
"""
Removes an instance from the client's registered instances, to prevent
Expand Down Expand Up @@ -416,6 +432,102 @@ def get_table(self, instance_id: str, table_id: str, *args, **kwargs) -> TableAs
"""
return TableAsync(self, instance_id, table_id, *args, **kwargs)

async def execute_query(
self,
query: str,
instance_id: str,
*,
parameters: Dict[str, ExecuteQueryValueType] | None = None,
parameter_types: Dict[str, SqlType.Type] | None = None,
app_profile_id: str | None = None,
operation_timeout: float = 600,
attempt_timeout: float | None = 20,
retryable_errors: Sequence[type[Exception]] = (
DeadlineExceeded,
ServiceUnavailable,
Aborted,
),
) -> "ExecuteQueryIteratorAsync":
"""
Executes an SQL query on an instance.
Returns an iterator to asynchronously stream back columns from selected rows.
Failed requests within operation_timeout will be retried based on the
retryable_errors list until operation_timeout is reached.
Args:
- query: Query to be run on Bigtable instance. The query can use ``@param``
placeholders to use parameter interpolation on the server. Values for all
parameters should be provided in ``parameters``. Types of parameters are
inferred but should be provided in ``parameter_types`` if the inference is
not possible (i.e. when value can be None, an empty list or an empty dict).
- instance_id: The Bigtable instance ID to perform the query on.
instance_id is combined with the client's project to fully
specify the instance.
- parameters: Dictionary with values for all parameters used in the ``query``.
- parameter_types: Dictionary with types of parameters used in the ``query``.
Required to contain entries only for parameters whose type cannot be
detected automatically (i.e. the value can be None, an empty list or
an empty dict).
- app_profile_id: The app profile to associate with requests.
https://cloud.google.com/bigtable/docs/app-profiles
- operation_timeout: the time budget for the entire operation, in seconds.
Failed requests will be retried within the budget.
Defaults to 600 seconds.
- attempt_timeout: the time budget for an individual network request, in seconds.
If it takes longer than this time to complete, the request will be cancelled with
a DeadlineExceeded exception, and a retry will be attempted.
Defaults to the 20 seconds.
If None, defaults to operation_timeout.
- retryable_errors: a list of errors that will be retried if encountered.
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
Returns:
- an asynchronous iterator that yields rows returned by the query
Raises:
- DeadlineExceeded: raised after operation timeout
will be chained with a RetryExceptionGroup containing GoogleAPIError exceptions
from any retries that failed
- GoogleAPIError: raised if the request encounters an unrecoverable error
"""
warnings.warn(
"ExecuteQuery is in preview and may change in the future.",
category=RuntimeWarning,
)

retryable_excs = [_get_error_type(e) for e in retryable_errors]

pb_params = _format_execute_query_params(parameters, parameter_types)

instance_name = self._gapic_client.instance_path(self.project, instance_id)

request_body = {
"instance_name": instance_name,
"app_profile_id": app_profile_id,
"query": query,
"params": pb_params,
"proto_format": {},
}

# app_profile_id should be set to an empty string for ExecuteQueryRequest only
app_profile_id_for_metadata = app_profile_id or ""

req_metadata = _make_metadata(
table_name=None,
app_profile_id=app_profile_id_for_metadata,
instance_name=instance_name,
)

return ExecuteQueryIteratorAsync(
self,
instance_id,
app_profile_id,
request_body,
attempt_timeout,
operation_timeout,
req_metadata,
retryable_excs,
)

async def __aenter__(self):
self._start_background_channel_refresh()
return self
Expand Down Expand Up @@ -893,7 +1005,9 @@ async def sample_row_keys(
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

# prepare request
metadata = _make_metadata(self.table_name, self.app_profile_id)
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)

async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
Expand Down Expand Up @@ -1029,7 +1143,9 @@ async def mutate_row(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=attempt_timeout,
metadata=_make_metadata(self.table_name, self.app_profile_id),
metadata=_make_metadata(
self.table_name, self.app_profile_id, instance_name=None
),
retry=None,
)
return await retries.retry_target_async(
Expand Down Expand Up @@ -1147,7 +1263,9 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
metadata = _make_metadata(self.table_name, self.app_profile_id)
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.check_and_mutate_row(
true_mutations=true_case_list,
false_mutations=false_case_list,
Expand Down Expand Up @@ -1198,7 +1316,9 @@ async def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
metadata = _make_metadata(self.table_name, self.app_profile_id)
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.read_modify_write_row(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
Expand Down
Loading

0 comments on commit 45bc8c4

Please sign in to comment.