Skip to content

Commit

Permalink
remove custom _make_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Nov 4, 2024
1 parent 2bca8fb commit dcd59c3
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 144 deletions.
5 changes: 0 additions & 5 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

Expand Down Expand Up @@ -84,14 +83,10 @@ def __init__(
f"all entries. Found {total_mutations}."
)
# create partial function to pass to trigger rpc call
metadata = _make_metadata(
table.table_name, table.app_profile_id, instance_name=None
)
self._gapic_fn = functools.partial(
gapic_client.mutate_rows,
table_name=table.table_name,
app_profile_id=table.app_profile_id,
metadata=metadata,
retry=None,
)
# create predicate for determining which errors are retryable
Expand Down
6 changes: 0 additions & 6 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
Expand Down Expand Up @@ -74,7 +73,6 @@ class _ReadRowsOperationAsync:
"request",
"table",
"_predicate",
"_metadata",
"_last_yielded_row_key",
"_remaining_count",
)
Expand All @@ -101,9 +99,6 @@ def __init__(
self.request = query._to_pb(table)
self.table = table
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._metadata = _make_metadata(
table.table_name, table.app_profile_id, instance_name=None
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

Expand Down Expand Up @@ -152,7 +147,6 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
gapic_stream = self.table.client._gapic_client.read_rows(
self.request,
timeout=next(self.attempt_timeout_gen),
metadata=self._metadata,
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
Expand Down
52 changes: 12 additions & 40 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
_get_error_type,
_get_retryable_errors,
_get_timeouts,
_make_metadata,
_retry_exception_factory,
_validate_timeouts,
_WarmedInstanceKey,
Expand Down Expand Up @@ -262,19 +261,18 @@ async def _ping_and_warm_instances(
request_serializer=PingAndWarmRequest.serialize,
)
# prepare list of coroutines to run
tasks = [
ping_rpc(
request={"name": instance_name, "app_profile_id": app_profile_id},
metadata=[
(
"x-goog-request-params",
f"name={instance_name}&app_profile_id={app_profile_id}",
)
],
wait_for_ready=True,
tasks = []
for (instance_name, table_name, app_profile_id) in instance_list:
metadata_str = f"name={instance_name}"
if app_profile_id is not None:
metadata_str = f"{metadata_str}&app_profile_id={app_profile_id}"
tasks.append(
ping_rpc(
request={"name": instance_name, "app_profile_id": app_profile_id},
metadata=[("x-goog-request-params", metadata_str)],
wait_for_ready=True,
)
)
for (instance_name, table_name, app_profile_id) in instance_list
]
# execute coroutines in parallel
result_list = await asyncio.gather(*tasks, return_exceptions=True)
# return None in place of empty successful responses
Expand Down Expand Up @@ -508,24 +506,14 @@ async def execute_query(
"proto_format": {},
}

# app_profile_id should be set to an empty string for ExecuteQueryRequest only
app_profile_id_for_metadata = app_profile_id or ""

req_metadata = _make_metadata(
table_name=None,
app_profile_id=app_profile_id_for_metadata,
instance_name=instance_name,
)

return ExecuteQueryIteratorAsync(
self,
instance_id,
app_profile_id,
request_body,
attempt_timeout,
operation_timeout,
req_metadata,
retryable_excs,
retryable_excs=retryable_excs,
)

async def __aenter__(self):
Expand Down Expand Up @@ -1005,16 +993,11 @@ async def sample_row_keys(
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

# prepare request
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)

async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=next(attempt_timeout_gen),
metadata=metadata,
retry=None,
)
return [(s.row_key, s.offset_bytes) async for s in results]
Expand Down Expand Up @@ -1143,9 +1126,6 @@ async def mutate_row(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=attempt_timeout,
metadata=_make_metadata(
self.table_name, self.app_profile_id, instance_name=None
),
retry=None,
)
return await retries.retry_target_async(
Expand Down Expand Up @@ -1263,17 +1243,13 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.check_and_mutate_row(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
timeout=operation_timeout,
retry=None,
)
Expand Down Expand Up @@ -1316,15 +1292,11 @@ async def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.read_modify_write_row(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
timeout=operation_timeout,
retry=None,
)
Expand Down
25 changes: 0 additions & 25 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,6 @@ class TABLE_DEFAULT(enum.Enum):
MUTATE_ROWS = "MUTATE_ROWS_DEFAULT"


def _make_metadata(
table_name: str | None, app_profile_id: str | None, instance_name: str | None
) -> list[tuple[str, str]]:
"""
Create properly formatted gRPC metadata for requests.
"""
params = []

if table_name is not None and instance_name is not None:
raise ValueError("metadata can't contain both instance_name and table_name")

if table_name is not None:
params.append(f"table_name={table_name}")
if instance_name is not None:
params.append(f"name={instance_name}")
if app_profile_id is not None:
params.append(f"app_profile_id={app_profile_id}")
if len(params) == 0:
raise ValueError(
"At least one of table_name and app_profile_id should be not None."
)
params_str = "&".join(params)
return [("x-goog-request-params", params_str)]


def _attempt_timeout_generator(
per_request_timeout: float | None, operation_timeout: float
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ def __init__(
request_body: Dict[str, Any],
attempt_timeout: float | None,
operation_timeout: float,
req_metadata: Sequence[Tuple[str, str]],
retryable_excs: List[type[Exception]],
req_metadata: Sequence[Tuple[str, str]] | None = None,
retryable_excs: List[type[Exception]] | None = None,
) -> None:
self._table_name = None
self._app_profile_id = app_profile_id
Expand All @@ -99,6 +99,7 @@ def __init__(
self._attempt_timeout_gen = _attempt_timeout_generator(
attempt_timeout, operation_timeout
)
retryable_excs = retryable_excs or []
self._async_stream = retries.retry_target_stream_async(
self._make_request_with_resume_token,
retries.if_exception_type(*retryable_excs),
Expand Down
7 changes: 1 addition & 6 deletions tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,10 @@ def test_ctor(self):
assert client.mutate_rows.call_count == 1
# gapic_fn should call with table details
inner_kwargs = client.mutate_rows.call_args[1]
assert len(inner_kwargs) == 4
assert len(inner_kwargs) == 3
assert inner_kwargs["table_name"] == table.table_name
assert inner_kwargs["app_profile_id"] == table.app_profile_id
assert inner_kwargs["retry"] is None
metadata = inner_kwargs["metadata"]
assert len(metadata) == 1
assert metadata[0][0] == "x-goog-request-params"
assert str(table.table_name) in metadata[0][1]
assert str(table.app_profile_id) in metadata[0][1]
# entries should be passed down
entries_w_pb = [_EntryWithProto(e, e._to_pb()) for e in entries]
assert instance.mutations == entries_w_pb
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/data/_async/test__read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ def test_ctor(self):
assert instance._remaining_count == row_limit
assert instance.operation_timeout == expected_operation_timeout
assert client.read_rows.call_count == 0
assert instance._metadata == [
(
"x-goog-request-params",
"table_name=test_table&app_profile_id=test_profile",
)
]
assert instance.request.table_name == table.table_name
assert instance.request.app_profile_id == table.app_profile_id
assert instance.request.rows_limit == row_limit
Expand Down
27 changes: 1 addition & 26 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,11 +2176,10 @@ async def test_sample_row_keys_gapic_params(self):
await table.sample_row_keys(attempt_timeout=expected_timeout)
args, kwargs = sample_row_keys.call_args
assert len(args) == 0
assert len(kwargs) == 5
assert len(kwargs) == 4
assert kwargs["timeout"] == expected_timeout
assert kwargs["app_profile_id"] == expected_profile
assert kwargs["table_name"] == table.table_name
assert kwargs["metadata"] is not None
assert kwargs["retry"] is None

@pytest.mark.parametrize(
Expand Down Expand Up @@ -2375,30 +2374,6 @@ async def test_mutate_row_non_retryable_errors(self, non_retryable_exception):
"row_key", mutation, operation_timeout=0.2
)

@pytest.mark.parametrize("include_app_profile", [True, False])
@pytest.mark.asyncio
async def test_mutate_row_metadata(self, include_app_profile):
"""request should attach metadata headers"""
profile = "profile" if include_app_profile else None
async with _make_client() as client:
async with client.get_table("i", "t", app_profile_id=profile) as table:
with mock.patch.object(
client._gapic_client, "mutate_row", AsyncMock()
) as read_rows:
await table.mutate_row("rk", mock.Mock())
kwargs = read_rows.call_args_list[0].kwargs
metadata = kwargs["metadata"]
goog_metadata = None
for key, value in metadata:
if key == "x-goog-request-params":
goog_metadata = value
assert goog_metadata is not None, "x-goog-request-params not found"
assert "table_name=" + table.table_name in goog_metadata
if include_app_profile:
assert "app_profile_id=profile" in goog_metadata
else:
assert "app_profile_id=" not in goog_metadata

@pytest.mark.parametrize("mutations", [[], None])
@pytest.mark.asyncio
async def test_mutate_row_no_mutations(self, mutations):
Expand Down
28 changes: 0 additions & 28 deletions tests/unit/data/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@
import mock


class TestMakeMetadata:
@pytest.mark.parametrize(
"table,profile,instance,expected",
[
("table", "profile", None, "table_name=table&app_profile_id=profile"),
("table", None, None, "table_name=table"),
(None, None, "instance", "name=instance"),
(None, "profile", None, "app_profile_id=profile"),
(None, "profile", "instance", "name=instance&app_profile_id=profile"),
],
)
def test__make_metadata(self, table, profile, instance, expected):
metadata = _helpers._make_metadata(table, profile, instance)
assert metadata == [("x-goog-request-params", expected)]

@pytest.mark.parametrize(
"table,profile,instance",
[
("table", None, "instance"),
("table", "profile", "instance"),
(None, None, None),
],
)
def test__make_metadata_invalid_params(self, table, profile, instance):
with pytest.raises(ValueError):
_helpers._make_metadata(table, profile, instance)


class TestAttemptTimeoutGenerator:
@pytest.mark.parametrize(
"request_t,operation_t,expected_list",
Expand Down

0 comments on commit dcd59c3

Please sign in to comment.