Skip to content

Commit

Permalink
restructured classes into separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Nov 14, 2023
1 parent 1fa74d4 commit 2e751f8
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 162 deletions.
5 changes: 2 additions & 3 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._metrics import _OperationType

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand All @@ -35,7 +34,7 @@
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import _ActiveOperationMetric
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


class _MutateRowsOperationAsync:
Expand All @@ -56,7 +55,7 @@ def __init__(
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
metrics: _ActiveOperationMetric,
metrics: ActiveOperationMetric,
):
"""
Args:
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import _ActiveOperationMetric
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


class _ResetRow(Exception):
Expand Down Expand Up @@ -77,7 +77,7 @@ def __init__(
table: "TableAsync",
operation_timeout: float,
attempt_timeout: float,
metrics: _ActiveOperationMetric,
metrics: ActiveOperationMetric,
):
self.attempt_timeout_gen = _attempt_timeout_generator(
attempt_timeout, operation_timeout
Expand Down Expand Up @@ -221,7 +221,7 @@ async def chunk_stream(
@staticmethod
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None,
operation: _ActiveOperationMetric,
operation: ActiveOperationMetric,
):
"""
Merge chunks into rows
Expand Down
20 changes: 10 additions & 10 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain

from google.cloud.bigtable.data._metrics import BigtableClientSideMetrics
from google.cloud.bigtable.data._metrics import _OperationType
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import OperationType


if TYPE_CHECKING:
Expand Down Expand Up @@ -509,7 +509,7 @@ def __init__(
)
self.default_mutate_rows_attempt_timeout = default_mutate_rows_attempt_timeout

self._metrics = BigtableClientSideMetrics(
self._metrics = BigtableClientSideMetricsController(
project_id=self.client.project, instance_id=instance_id, table_id=table_id, app_profile_id=app_profile_id
)

Expand Down Expand Up @@ -564,7 +564,7 @@ async def read_rows_stream(
# used so that read_row can disable is_streaming flag
metric_operation = kwargs.pop("metric_operation", None)
if metric_operation is None:
metric_operation = self._metrics.create_operation(_OperationType.READ_ROWS, is_streaming=True)
metric_operation = self._metrics.create_operation(OperationType.READ_ROWS, is_streaming=True)

row_merger = _ReadRowsOperationAsync(
query,
Expand Down Expand Up @@ -649,7 +649,7 @@ async def read_row(
"""
if row_key is None:
raise ValueError("row_key must be string or bytes")
metric_operation = self._metrics.create_operation(_OperationType.READ_ROWS, is_streaming=False)
metric_operation = self._metrics.create_operation(OperationType.READ_ROWS, is_streaming=False)
query = ReadRowsQuery(row_keys=row_key, row_filter=row_filter, limit=1)
results = await self.read_rows(
query,
Expand Down Expand Up @@ -853,7 +853,7 @@ def on_error_fn(exc):


# wrap rpc in retry and metric collection logic
async with self._metrics.create_operation(_OperationType.SAMPLE_ROW_KEYS) as operation:
async with self._metrics.create_operation(OperationType.SAMPLE_ROW_KEYS) as operation:

async def execute_rpc():
stream = await self.client._gapic_client.sample_row_keys(
Expand Down Expand Up @@ -996,7 +996,7 @@ def on_error_fn(exc):
)

# wrap rpc in retry and metric collection logic
async with self._metrics.create_operation(_OperationType.MUTATE_ROW) as operation:
async with self._metrics.create_operation(OperationType.MUTATE_ROW) as operation:
metric_wrapped = operation.wrap_attempt_fn(self.client._gapic_client.mutate_row, predicate)
retry_wrapped = retry(metric_wrapped)
# convert RetryErrors from retry wrapper into DeadlineExceeded errors
Expand Down Expand Up @@ -1054,7 +1054,7 @@ async def bulk_mutate_rows(
mutation_entries,
operation_timeout,
attempt_timeout,
self._metrics.create_operation(_OperationType.BULK_MUTATE_ROWS),
self._metrics.create_operation(OperationType.BULK_MUTATE_ROWS),
)
await operation.start()

Expand Down Expand Up @@ -1113,7 +1113,7 @@ async def check_and_mutate_row(
false_case_dict = [m._to_dict() for m in false_case_mutations or []]
metadata = _make_metadata(self.table_name, self.app_profile_id)

async with self._metrics.create_operation(_OperationType.CHECK_AND_MUTATE) as operation:
async with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE) as operation:
metric_wrapped = operation.wrap_attempt_fn(self.client._gapic_client.check_and_mutate_row)
result = await metric_wrapped(
request={
Expand Down Expand Up @@ -1174,7 +1174,7 @@ async def read_modify_write_row(
rules_dict = [rule._to_dict() for rule in rules]
metadata = _make_metadata(self.table_name, self.app_profile_id)

async with self._metrics.create_operation(_OperationType.READ_MODIFY_WRITE) as operation:
async with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE) as operation:
metric_wrapped = operation.wrap_attempt_fn(self.client._gapic_client.read_modify_write_row)

result = await metric_wrapped(
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
_MUTATE_ROWS_REQUEST_MUTATION_LIMIT,
)
from google.cloud.bigtable.data.mutations import Mutation
from google.cloud.bigtable.data._metrics import _OperationType
from google.cloud.bigtable.data._metrics import OperationType

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
Expand Down Expand Up @@ -353,7 +353,7 @@ async def _execute_mutate_rows(
batch,
operation_timeout=self._operation_timeout,
attempt_timeout=self._attempt_timeout,
metrics=self._table._metrics.create_operation(_OperationType.BULK_MUTATE_ROWS),
metrics=self._table._metrics.create_operation(OperationType.BULK_MUTATE_ROWS),
)
await operation.start()
except MutationsExceptionGroup as e:
Expand Down
27 changes: 27 additions & 0 deletions google/cloud/bigtable/data/_metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud.bigtable.data._metrics.handlers.opentelemetry import OpenTelemetryMetricsHandler
from google.cloud.bigtable.data._metrics.handlers.stdout import StdoutMetricsHandler
from google.cloud.bigtable.data._metrics.metrics_controller import BigtableClientSideMetricsController

from google.cloud.bigtable.data._metrics.data_model import OperationType
from google.cloud.bigtable.data._metrics.data_model import ActiveOperationMetric

__all__ = (
"BigtableClientSideMetricsController",
"OpenTelemetryMetricsHandler",
"StdoutMetricsHandler",
"OperationType",
"ActiveOperationMetric",
)
Loading

0 comments on commit 2e751f8

Please sign in to comment.