Skip to content

Commit

Permalink
chore: merge branch 'main'
Browse files Browse the repository at this point in the history
  • Loading branch information
daniel-sanche committed Jan 18, 2024
2 parents f0d75de + 1859e67 commit cdfcc2a
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 10 deletions.
4 changes: 2 additions & 2 deletions .github/.OwlBot.lock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# limitations under the License.
docker:
image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest
digest: sha256:230f7fe8a0d2ed81a519cfc15c6bb11c5b46b9fb449b8b1219b3771bcb520ad2
# created: 2023-12-09T15:16:25.430769578Z
digest: sha256:346ab2efb51649c5dde7756cbbdc60dd394852ba83b9bbffc292a63549f33c17
# created: 2023-12-14T22:17:57.611773021Z
2 changes: 1 addition & 1 deletion .github/workflows/system_emulated.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
python-version: '3.8'

- name: Setup GCloud SDK
uses: google-github-actions/[email protected].0
uses: google-github-actions/[email protected].1

- name: Install / run Nox
run: |
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ jobs:
run: |
nox -s unit-${{ matrix.python }}
- name: Upload coverage results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: coverage-artifacts
name: coverage-artifact-${{ matrix.python }}
path: .coverage-${{ matrix.python }}

cover:
Expand All @@ -47,11 +47,11 @@ jobs:
python -m pip install --upgrade setuptools pip wheel
python -m pip install coverage
- name: Download coverage results
uses: actions/download-artifact@v3
uses: actions/download-artifact@v4
with:
name: coverage-artifacts
path: .coverage-results/
- name: Report coverage results
run: |
coverage combine .coverage-results/.coverage*
find .coverage-results -type f -name '*.zip' -exec unzip {} \;
coverage combine .coverage-results/**/.coverage*
coverage report --show-missing --fail-under=100
7 changes: 7 additions & 0 deletions google/cloud/bigtable_admin_v2/types/bigtable_table_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,9 @@ class ModifyColumnFamiliesRequest(proto.Message):
earlier modifications can be masked by later
ones (in the case of repeated updates to the
same family, for example).
ignore_warnings (bool):
Optional. If true, ignore safety checks when
modifying the column families.
"""

class Modification(proto.Message):
Expand Down Expand Up @@ -662,6 +665,10 @@ class Modification(proto.Message):
number=2,
message=Modification,
)
ignore_warnings: bool = proto.Field(
proto.BOOL,
number=3,
)


class GenerateConsistencyTokenRequest(proto.Message):
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable_v2/services/bigtable/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from google.api_core.client_options import ClientOptions
from google.api_core import exceptions as core_exceptions
from google.api_core import gapic_v1
from google.api_core import retry as retries
from google.api_core import retry_async as retries
from google.auth import credentials as ga_credentials # type: ignore
from google.oauth2 import service_account # type: ignore

Expand Down
16 changes: 16 additions & 0 deletions google/cloud/bigtable_v2/types/feature_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ class FeatureFlags(proto.Message):
Notify the server that the client supports the
last_scanned_row field in ReadRowsResponse for long-running
scans.
routing_cookie (bool):
Notify the server that the client supports
using encoded routing cookie strings to retry
requests with.
retry_info (bool):
Notify the server that the client supports
using retry info back off durations to retry
requests with.
"""

reverse_scans: bool = proto.Field(
Expand All @@ -77,6 +85,14 @@ class FeatureFlags(proto.Message):
proto.BOOL,
number=4,
)
routing_cookie: bool = proto.Field(
proto.BOOL,
number=6,
)
retry_info: bool = proto.Field(
proto.BOOL,
number=7,
)


__all__ = tuple(sorted(__protobuf__.manifest))
2 changes: 1 addition & 1 deletion scripts/fixup_bigtable_admin_v2_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class bigtable_adminCallTransformer(cst.CSTTransformer):
'list_instances': ('parent', 'page_token', ),
'list_snapshots': ('parent', 'page_size', 'page_token', ),
'list_tables': ('parent', 'view', 'page_size', 'page_token', ),
'modify_column_families': ('name', 'modifications', ),
'modify_column_families': ('name', 'modifications', 'ignore_warnings', ),
'partial_update_cluster': ('cluster', 'update_mask', ),
'partial_update_instance': ('instance', 'update_mask', ),
'restore_table': ('parent', 'table_id', 'backup', ),
Expand Down
269 changes: 269 additions & 0 deletions tests/unit/test_batcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import mock
import time

import pytest

from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.batcher import (
_FlowControl,
MutationsBatcher,
MutationsBatchError,
)

TABLE_ID = "table-id"
TABLE_NAME = "/tables/" + TABLE_ID


def test_mutation_batcher_constructor():
table = _Table(TABLE_NAME)
with MutationsBatcher(table) as mutation_batcher:
assert table is mutation_batcher.table


def test_mutation_batcher_w_user_callback():
table = _Table(TABLE_NAME)

def callback_fn(response):
callback_fn.count = len(response)

with MutationsBatcher(
table, flush_count=1, batch_completed_callback=callback_fn
) as mutation_batcher:
rows = [
DirectRow(row_key=b"row_key"),
DirectRow(row_key=b"row_key_2"),
DirectRow(row_key=b"row_key_3"),
DirectRow(row_key=b"row_key_4"),
]

mutation_batcher.mutate_rows(rows)

assert callback_fn.count == 4


def test_mutation_batcher_mutate_row():
table = _Table(TABLE_NAME)
with MutationsBatcher(table=table) as mutation_batcher:
rows = [
DirectRow(row_key=b"row_key"),
DirectRow(row_key=b"row_key_2"),
DirectRow(row_key=b"row_key_3"),
DirectRow(row_key=b"row_key_4"),
]

mutation_batcher.mutate_rows(rows)

assert table.mutation_calls == 1


def test_mutation_batcher_mutate():
table = _Table(TABLE_NAME)
with MutationsBatcher(table=table) as mutation_batcher:
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)
row.set_cell("cf1", b"c4", 4)

mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutation_batcher_flush_w_no_rows():
table = _Table(TABLE_NAME)
with MutationsBatcher(table=table) as mutation_batcher:
mutation_batcher.flush()

assert table.mutation_calls == 0


def test_mutation_batcher_mutate_w_max_flush_count():
table = _Table(TABLE_NAME)
with MutationsBatcher(table=table, flush_count=3) as mutation_batcher:
row_1 = DirectRow(row_key=b"row_key_1")
row_2 = DirectRow(row_key=b"row_key_2")
row_3 = DirectRow(row_key=b"row_key_3")

mutation_batcher.mutate(row_1)
mutation_batcher.mutate(row_2)
mutation_batcher.mutate(row_3)

assert table.mutation_calls == 1


@mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", new=3)
def test_mutation_batcher_mutate_w_max_mutations():
table = _Table(TABLE_NAME)
with MutationsBatcher(table=table) as mutation_batcher:
row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", 1)
row.set_cell("cf1", b"c2", 2)
row.set_cell("cf1", b"c3", 3)

mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutation_batcher_mutate_w_max_row_bytes():
table = _Table(TABLE_NAME)
with MutationsBatcher(
table=table, max_row_bytes=3 * 1024 * 1024
) as mutation_batcher:
number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)
row.set_cell("cf1", b"c3", max_value)

mutation_batcher.mutate(row)

assert table.mutation_calls == 1


def test_mutations_batcher_flushed_when_closed():
table = _Table(TABLE_NAME)
mutation_batcher = MutationsBatcher(table=table, max_row_bytes=3 * 1024 * 1024)

number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)

mutation_batcher.mutate(row)
assert table.mutation_calls == 0

mutation_batcher.close()

assert table.mutation_calls == 1


def test_mutations_batcher_context_manager_flushed_when_closed():
table = _Table(TABLE_NAME)
with MutationsBatcher(
table=table, max_row_bytes=3 * 1024 * 1024
) as mutation_batcher:
number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes

row = DirectRow(row_key=b"row_key")
row.set_cell("cf1", b"c1", max_value)
row.set_cell("cf1", b"c2", max_value)

mutation_batcher.mutate(row)

assert table.mutation_calls == 1


@mock.patch("google.cloud.bigtable.batcher.MutationsBatcher.flush")
def test_mutations_batcher_flush_interval(mocked_flush):
table = _Table(TABLE_NAME)
flush_interval = 0.5
mutation_batcher = MutationsBatcher(table=table, flush_interval=flush_interval)

assert mutation_batcher._timer.interval == flush_interval
mocked_flush.assert_not_called()

time.sleep(0.4)
mocked_flush.assert_not_called()

time.sleep(0.1)
mocked_flush.assert_called_once_with()

mutation_batcher.close()


def test_mutations_batcher_response_with_error_codes():
from google.rpc.status_pb2 import Status

mocked_response = [Status(code=1), Status(code=5)]

with mock.patch("tests.unit.test_batcher._Table") as mocked_table:
table = mocked_table.return_value
mutation_batcher = MutationsBatcher(table=table)

row1 = DirectRow(row_key=b"row_key")
row2 = DirectRow(row_key=b"row_key")
table.mutate_rows.return_value = mocked_response

mutation_batcher.mutate_rows([row1, row2])
with pytest.raises(MutationsBatchError) as exc:
mutation_batcher.close()
assert exc.value.message == "Errors in batch mutations."
assert len(exc.value.exc) == 2

assert exc.value.exc[0].message == mocked_response[0].message
assert exc.value.exc[1].message == mocked_response[1].message


def test_flow_control_event_is_set_when_not_blocked():
flow_control = _FlowControl()

flow_control.set_flow_control_status()
assert flow_control.event.is_set()


def test_flow_control_event_is_not_set_when_blocked():
flow_control = _FlowControl()

flow_control.inflight_mutations = flow_control.max_mutations
flow_control.inflight_size = flow_control.max_mutation_bytes

flow_control.set_flow_control_status()
assert not flow_control.event.is_set()


@mock.patch("concurrent.futures.ThreadPoolExecutor.submit")
def test_flush_async_batch_count(mocked_executor_submit):
table = _Table(TABLE_NAME)
mutation_batcher = MutationsBatcher(table=table, flush_count=2)

number_of_bytes = 1 * 1024 * 1024
max_value = b"1" * number_of_bytes
for index in range(5):
row = DirectRow(row_key=f"row_key_{index}")
row.set_cell("cf1", b"c1", max_value)
mutation_batcher.mutate(row)
mutation_batcher._flush_async()

# 3 batches submitted. 2 batches of 2 items, and the last one a single item batch.
assert mocked_executor_submit.call_count == 3


class _Instance(object):
def __init__(self, client=None):
self._client = client


class _Table(object):
def __init__(self, name, client=None):
self.name = name
self._instance = _Instance(client)
self.mutation_calls = 0

def mutate_rows(self, rows):
from google.rpc.status_pb2 import Status

self.mutation_calls += 1

return [Status(code=0) for _ in rows]

0 comments on commit cdfcc2a

Please sign in to comment.