diff --git a/google/cloud/bigtable/data/_async/_mutate_rows.py b/google/cloud/bigtable/data/_async/_mutate_rows.py index d4ffdee22..7d1144553 100644 --- a/google/cloud/bigtable/data/_async/_mutate_rows.py +++ b/google/cloud/bigtable/data/_async/_mutate_rows.py @@ -189,6 +189,9 @@ async def _run_attempt(self): if result.status.code != 0: # mutation failed; update error list (and remaining_indices if retryable) self._handle_entry_error(orig_idx, entry_error) + elif orig_idx in self.errors: + # mutation succeeded; remove from error list + del self.errors[orig_idx] # remove processed entry from active list del active_request_indices[result.index] except Exception as exc: diff --git a/tests/system/data/test_system.py b/tests/system/data/test_system.py index fb0d9eb82..b8423279a 100644 --- a/tests/system/data/test_system.py +++ b/tests/system/data/test_system.py @@ -238,6 +238,28 @@ async def test_bulk_mutations_set_cell(client, table, temp_rows): assert (await _retrieve_cell_value(table, row_key)) == new_value +@pytest.mark.asyncio +async def test_bulk_mutations_raise_exception(client, table): + """ + If an invalid mutation is passed, an exception should be raised + """ + from google.cloud.bigtable.data.mutations import RowMutationEntry, SetCell + from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup + from google.cloud.bigtable.data.exceptions import FailedMutationEntryError + + row_key = uuid.uuid4().hex.encode() + mutation = SetCell(family="nonexistent", qualifier=b"test-qualifier", new_value=b"") + bulk_mutation = RowMutationEntry(row_key, [mutation]) + + with pytest.raises(MutationsExceptionGroup) as exc: + await table.bulk_mutate_rows([bulk_mutation]) + assert len(exc.value.exceptions) == 1 + entry_error = exc.value.exceptions[0] + assert isinstance(entry_error, FailedMutationEntryError) + assert entry_error.index == 0 + assert entry_error.entry == bulk_mutation + + @pytest.mark.usefixtures("client") @pytest.mark.usefixtures("table") @retry.AsyncRetry(predicate=retry.if_exception_type(ClientError), initial=1, maximum=5) diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 46080e497..9a12abe9b 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -2659,6 +2659,30 @@ async def test_bulk_mutate_error_index(self): assert isinstance(cause.exceptions[1], DeadlineExceeded) assert isinstance(cause.exceptions[2], FailedPrecondition) + @pytest.mark.asyncio + async def test_bulk_mutate_error_recovery(self): + """ + If an error occurs, then resolves, no exception should be raised + """ + from google.api_core.exceptions import DeadlineExceeded + + async with self._make_client(project="project") as client: + table = client.get_table("instance", "table") + with mock.patch.object(client._gapic_client, "mutate_rows") as mock_gapic: + # fail with a retryable error, then a non-retryable one + mock_gapic.side_effect = [ + self._mock_response([DeadlineExceeded("mock")]), + self._mock_response([None]), + ] + mutation = mutations.SetCell( + "family", b"qualifier", b"value", timestamp_micros=123 + ) + entries = [ + mutations.RowMutationEntry((f"row_key_{i}").encode(), [mutation]) + for i in range(3) + ] + await table.bulk_mutate_rows(entries, operation_timeout=1000) + class TestCheckAndMutateRow: def _make_client(self, *args, **kwargs):