From ef454dfdf164ae44ebe1839318c5892fc6ff6a7a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 18 Dec 2023 16:06:00 -0800 Subject: [PATCH 1/3] added system test for exception raised on error --- tests/system/data/test_system.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/system/data/test_system.py b/tests/system/data/test_system.py index fb0d9eb82..a1eb9657a 100644 --- a/tests/system/data/test_system.py +++ b/tests/system/data/test_system.py @@ -238,6 +238,28 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows): assert (await _retrieve_cell_value(table, row_key)) == new_value +@pytest.mark.asyncio +async def test_bulk_mutations_raise_exception(client, table): + """ + If an invalid mutation is passed, an exception should be raised + """ + from google.cloud.bigtable.data.mutations import RowMutationEntry, SetCell + from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + from google.cloud.bigtable.data.exceptions import FailedMutationEntryError + + row_key = uuid.uuid4().hex.encode() + mutation = SetCell(family='nonexistent', qualifier=b"test-qualifier", new_value=b"") + bulk_mutation = RowMutationEntry(row_key, [mutation]) + + with pytest.raises(MutationsExceptionGroup) as exc: + await table.bulk_mutate_rows([bulk_mutation]) + assert len(exc.value.exceptions) == 1 + entry_error = exc.value.exceptions[0] + assert isinstance(entry_error, FailedMutationEntryError) + assert entry_error.index == 0 + assert entry_error.entry == bulk_mutation + + @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") @retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) From 8fcd2ee2aeb48a050a7531ed77a43fbcf88da0c1 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 18 Dec 2023 16:08:25 -0800 Subject: [PATCH 2/3] added test for mutation recovery --- .../bigtable/data/_async/_mutate_rows.py | 3 ++ tests/unit/data/_async/test_client.py | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index d4ffdee22..7d1144553 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -189,6 +189,9 @@ async def _run_attempt(self): if result.status.code != 0: # mutation failed; update error list (and remaining_indices if retryable) self._handle_entry_error(orig_idx, entry_error) + elif orig_idx in self.errors: + # mutation succeeded; remove from error list + del self.errors[orig_idx] # remove processed entry from active list del active_request_indices[result.index] except Exception as exc: diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 46080e497..24e85a29d 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -2659,6 +2659,34 @@ async def test_bulk_mutate_error_index(self): assert isinstance(cause.exceptions[1], DeadlineExceeded) assert isinstance(cause.exceptions[2], FailedPrecondition) + @pytest.mark.asyncio + async def test_bulk_mutate_error_recovery(self): + """ + If an error occurs, then resolves, no exception should be raised + """ + from google.api_core.exceptions import DeadlineExceeded + + async with self._make_client(project="project") as client: + table = client.get_table("instance", "table") + with mock.patch.object( + client._gapic_client, "mutate_rows" + ) as mock_gapic: + # fail with a retryable error, then a non-retryable one + mock_gapic.side_effect = [ + self._mock_response([DeadlineExceeded("mock")]), + self._mock_response([None]), + ] + mutation = mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=123 + ) + entries = [ + mutations.RowMutationEntry( + (f"row_key_{i}").encode(), [mutation] + ) + for i in range(3) + ] + await table.bulk_mutate_rows(entries, operation_timeout=1000) + class TestCheckAndMutateRow: def _make_client(self, *args, **kwargs): From 073d37d92d5b14e774729d75d248f4d819805d87 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 18 Dec 2023 16:10:09 -0800 Subject: [PATCH 3/3] ran blacken --- tests/system/data/test_system.py | 2 +- tests/unit/data/_async/test_client.py | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/system/data/test_system.py b/tests/system/data/test_system.py index a1eb9657a..b8423279a 100644 --- a/tests/system/data/test_system.py +++ b/tests/system/data/test_system.py @@ -248,7 +248,7 @@ async def test_bulk_mutations_raise_exception(client, table): from google.cloud.bigtable.data.exceptions import FailedMutationEntryError row_key = uuid.uuid4().hex.encode() - mutation = SetCell(family='nonexistent', qualifier=b"test-qualifier", new_value=b"") + mutation = SetCell(family="nonexistent", qualifier=b"test-qualifier", new_value=b"") bulk_mutation = RowMutationEntry(row_key, [mutation]) with pytest.raises(MutationsExceptionGroup) as exc: diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 24e85a29d..9a12abe9b 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -2668,9 +2668,7 @@ async def test_bulk_mutate_error_recovery(self): async with self._make_client(project="project") as client: table = client.get_table("instance", "table") - with mock.patch.object( - client._gapic_client, "mutate_rows" - ) as mock_gapic: + with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic: # fail with a retryable error, then a non-retryable one mock_gapic.side_effect = [ self._mock_response([DeadlineExceeded("mock")]), @@ -2680,9 +2678,7 @@ async def test_bulk_mutate_error_recovery(self): "family", b"qualifier", b"value", timestamp_micros=123 ) entries = [ - mutations.RowMutationEntry( - (f"row_key_{i}").encode(), [mutation] - ) + mutations.RowMutationEntry((f"row_key_{i}").encode(), [mutation]) for i in range(3) ] await table.bulk_mutate_rows(entries, operation_timeout=1000)