Skip to content

Commit

Permalink
fixed error recovery bug in _mutate_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Dec 14, 2023
1 parent 65b5485 commit 0e7887e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 62 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ async def _run_attempt(self):
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
finally:
Expand Down
127 changes: 65 additions & 62 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2328,31 +2328,6 @@ async def test_mutate_row_non_retryable_errors(self, non_retryable_exception):
"row_key", mutation, operation_timeout=0.2
)

@pytest.mark.parametrize("include_app_profile", [True, False])
@pytest.mark.asyncio
async def test_mutate_row_metadata(self, include_app_profile):
"""request should attach metadata headers"""
profile = "profile" if include_app_profile else None
async with self._make_client() as client:
table = client.get_table("i", "t", app_profile_id=profile)
with mock.patch.object(
client._gapic_client, "mutate_row"
) as gapic_mock:
gapic_mock.return_value = mock_grpc_call()
await table.mutate_row("rk", mock.Mock())
kwargs = gapic_mock.call_args_list[0].kwargs
metadata = kwargs["metadata"]
goog_metadata = None
for key, value in metadata:
if key == "x-goog-request-params":
goog_metadata = value
assert goog_metadata is not None, "x-goog-request-params not found"
assert "table_name=" + table.table_name in goog_metadata
if include_app_profile:
assert "app_profile_id=profile" in goog_metadata
else:
assert "app_profile_id=" not in goog_metadata

@pytest.mark.parametrize("mutations", [[], None])
@pytest.mark.asyncio
async def test_mutate_row_no_mutations(self, mutations):
Expand Down Expand Up @@ -2604,26 +2579,26 @@ async def test_bulk_mutate_rows_non_idempotent_retryable_errors(
)

async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response(
[retryable_exception("mock")]
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = lambda *a, **k: self._mock_response(
[retryable_exception("mock")]
)
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
"family", b"qualifier", b"value", -1
)
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
"family", b"qualifier", b"value", -1
)
entry = mutations.RowMutationEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is False
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, retryable_exception)
entry = mutations.RowMutationEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is False
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, retryable_exception)

@pytest.mark.parametrize(
"non_retryable_exception",
Expand All @@ -2646,24 +2621,24 @@ async def test_bulk_mutate_rows_non_retryable_errors(self, non_retryable_excepti
)

async with self._make_client(project="project") as client:
async with client.get_table("instance", "table") as table:
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = non_retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
"family", b"qualifier", b"value", timestamp_micros=123
)
entry = mutations.RowMutationEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is True
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" not in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, non_retryable_exception)
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
mock_gapic.side_effect = non_retryable_exception("mock")
with pytest.raises(MutationsExceptionGroup) as e:
mutation = mutations.SetCell(
"family", b"qualifier", b"value", timestamp_micros=123
)
entry = mutations.RowMutationEntry(b"row_key", [mutation])
assert mutation.is_idempotent() is True
await table.bulk_mutate_rows([entry], operation_timeout=0.2)
assert len(e.value.exceptions) == 1
failed_exception = e.value.exceptions[0]
assert isinstance(failed_exception, FailedMutationEntryError)
assert "non-idempotent" not in str(failed_exception)
cause = failed_exception.__cause__
assert isinstance(cause, non_retryable_exception)

@pytest.mark.asyncio
async def test_bulk_mutate_error_index(self):
Expand Down Expand Up @@ -2716,6 +2691,34 @@ async def test_bulk_mutate_error_index(self):
assert isinstance(cause.exceptions[1], DeadlineExceeded)
assert isinstance(cause.exceptions[2], FailedPrecondition)

@pytest.mark.asyncio
async def test_bulk_mutate_error_recovery(self):
"""
If an error occurs, then resolves, no exception should be raised
"""
from google.api_core.exceptions import DeadlineExceeded

async with self._make_client(project="project") as client:
table = client.get_table("instance", "table")
with mock.patch.object(
client._gapic_client, "mutate_rows"
) as mock_gapic:
# fail with a retryable error, then a non-retryable one
mock_gapic.side_effect = [
self._mock_response([DeadlineExceeded("mock")]),
self._mock_response([None]),
]
mutation = mutations.SetCell(
"family", b"qualifier", b"value", timestamp_micros=123
)
entries = [
mutations.RowMutationEntry(
(f"row_key_{i}").encode(), [mutation]
)
for i in range(3)
]
await table.bulk_mutate_rows(entries, operation_timeout=1000)


class TestCheckAndMutateRow:
def _make_client(self, *args, **kwargs):
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/data/_metrics/test_rpcs_instrumented.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ async def test_rpc_instrumented_multiple_attempts(fn_name, fn_args, gapic_fn, is
from google.cloud.bigtable.data import TableAsync
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.api_core.exceptions import Aborted
from google.cloud.bigtable_v2.types import MutateRowsResponse
from google.rpc.status_pb2 import Status

with mock.patch(f"google.cloud.bigtable_v2.BigtableAsyncClient.{gapic_fn}") as gapic_mock:
if is_unary:
Expand All @@ -139,6 +141,9 @@ async def test_rpc_instrumented_multiple_attempts(fn_name, fn_args, gapic_fn, is
else:
unary_response = None
grpc_call = mock_grpc_call(unary_response=unary_response)
if gapic_fn == "mutate_rows":
# patch response to send success
grpc_call.stream_response = [MutateRowsResponse(entries=[MutateRowsResponse.Entry(index=0, status=Status(code=0))])]
gapic_mock.side_effect = [Aborted("first attempt failed"), grpc_call]
async with BigtableDataClientAsync() as client:
table = TableAsync(client, "instance-id", "table-id")
Expand Down

0 comments on commit 0e7887e

Please sign in to comment.