Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] feat: add new sync data client #957

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
51d3557
pulled in sync generator from previous branch
daniel-sanche Apr 9, 2024
fb2ae39
simplified generator
daniel-sanche Apr 9, 2024
e788933
refactoring
daniel-sanche Apr 9, 2024
7c740ef
adding better support for yaml config
daniel-sanche Apr 9, 2024
ff0458d
updated import maps
daniel-sanche Apr 9, 2024
fcf2a2f
added back name_replacements
daniel-sanche Apr 9, 2024
51d9c7f
added configs for all async classes
daniel-sanche Apr 9, 2024
3fefea4
removed concrete class map
daniel-sanche Apr 9, 2024
b6a89dd
improced handling of unexpected asyncio tasks
daniel-sanche Apr 9, 2024
4d18e58
added concrete classes
daniel-sanche Apr 9, 2024
70fd731
keep docstrings on pass
daniel-sanche Apr 9, 2024
5c96cd3
improved import replacement code
daniel-sanche Apr 10, 2024
86b2e25
renamed the two types of replacements
daniel-sanche Apr 10, 2024
db16d0f
use concrete names in generator
daniel-sanche Apr 10, 2024
8c9676d
fixed import paths
daniel-sanche Apr 10, 2024
d4ca864
avoid circular import
daniel-sanche Apr 10, 2024
2d7b88f
improved string typing
daniel-sanche Apr 10, 2024
2c950f2
got client and table working
daniel-sanche Apr 10, 2024
f73a225
moved config into yaml file
daniel-sanche Apr 11, 2024
92d5608
moved save_path into yaml
daniel-sanche Apr 11, 2024
7b40f27
fixed sync emulator channel
daniel-sanche Apr 11, 2024
b3546d8
moved acceptance tests to async folder
daniel-sanche Apr 11, 2024
cbec8f2
broke module_replacements into asyncio and imports
daniel-sanche Apr 11, 2024
1af60be
update retries_async
daniel-sanche Apr 11, 2024
e486ad5
replaced asyncio.Task
daniel-sanche Apr 11, 2024
1974a50
removed use of async generator
daniel-sanche Apr 11, 2024
64ec3d5
fixed type issue
daniel-sanche Apr 11, 2024
f67ebc4
got _mutate_rows tests passing
daniel-sanche Apr 11, 2024
2ebbb72
got sync read_rows tests passing
daniel-sanche Apr 11, 2024
0bfb257
got flow control tests passing
daniel-sanche Apr 12, 2024
5b9e967
made some fixes to mutations batcher
daniel-sanche Apr 12, 2024
717c401
improved modularity of mutation batcher tests
daniel-sanche Apr 12, 2024
aa472c1
fixed TableAssync references
daniel-sanche Apr 12, 2024
06f0586
added event for batcher lock
daniel-sanche Apr 12, 2024
d097aa1
got mutation batcher tests passing
daniel-sanche Apr 12, 2024
8df5920
did some refactoring of client tests
daniel-sanche Apr 13, 2024
dcc0ab6
added sharded read rows sync
daniel-sanche Apr 13, 2024
10c565f
made tweaks to read rows sharded
daniel-sanche Apr 13, 2024
2cdf0ab
got client tests passing
daniel-sanche Apr 16, 2024
ef892bc
moved more logic into generated portion
daniel-sanche Apr 16, 2024
fe572bc
got table tests passing
daniel-sanche Apr 16, 2024
1d428a6
got all client tests passing
daniel-sanche Apr 16, 2024
751c676
fixed slow test
daniel-sanche Apr 17, 2024
97f4797
got async read_rows acceptance working again
daniel-sanche Apr 17, 2024
92f2132
got acceptance tests working
daniel-sanche Apr 17, 2024
c006482
ran blacken
daniel-sanche Apr 17, 2024
c39d26a
fixed some lint issues
daniel-sanche Apr 17, 2024
d5903e2
fixed mypy issues
daniel-sanche Apr 18, 2024
f9862a2
refactored system tests into class
daniel-sanche Apr 18, 2024
5cb4e82
fixed retry type
daniel-sanche Apr 18, 2024
46ef676
renamed test file
daniel-sanche Apr 18, 2024
44840f3
got system tests generated
daniel-sanche Apr 18, 2024
3c33954
fixed possible flake
daniel-sanche Apr 18, 2024
7c9c3bd
changed class order to match async
daniel-sanche Apr 18, 2024
7661fec
renamed class
daniel-sanche Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@

from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync

from google.cloud.bigtable.data._sync.client import BigtableDataClient
from google.cloud.bigtable.data._sync.client import Table

from google.cloud.bigtable.data._sync.mutations_batcher import MutationsBatcher

from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.read_rows_query import RowRange
from google.cloud.bigtable.data.row import Row
Expand Down Expand Up @@ -48,12 +53,15 @@
__version__: str = package_version.__version__

__all__ = (
"BigtableDataClient",
"Table",
"MutationsBatcher",
"BigtableDataClientAsync",
"TableAsync",
"MutationsBatcherAsync",
"RowKeySamples",
"ReadRowsQuery",
"RowRange",
"MutationsBatcherAsync",
"Mutation",
"RowMutationEntry",
"SetCell",
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def __init__(
bt_exceptions._MutateRowsIncomplete,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
# Note: _operation could be a raw coroutine, but using a lambda
# wrapper helps unify with sync code
self._operation = lambda: retries.retry_target_async(
self._run_attempt,
self.is_retryable,
sleep_generator,
Expand All @@ -125,7 +127,7 @@ async def start(self):
"""
try:
# trigger mutate_rows
await self._operation
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
Expand Down
21 changes: 8 additions & 13 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
Expand All @@ -32,9 +31,7 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data import _helpers

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator
Expand Down Expand Up @@ -80,7 +77,7 @@ def __init__(
attempt_timeout: float,
retryable_exceptions: Sequence[type[Exception]] = (),
):
self.attempt_timeout_gen = _attempt_timeout_generator(
self.attempt_timeout_gen = _helpers._attempt_timeout_generator(
attempt_timeout, operation_timeout
)
self.operation_timeout = operation_timeout
Expand All @@ -94,14 +91,14 @@ def __init__(
self.request = query._to_pb(table)
self.table = table
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._metadata = _make_metadata(
self._metadata = _helpers._make_metadata(
table.table_name,
table.app_profile_id,
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

def start_operation(self) -> AsyncGenerator[Row, None]:
def start_operation(self) -> AsyncIterable[Row]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
Expand All @@ -110,10 +107,10 @@ def start_operation(self) -> AsyncGenerator[Row, None]:
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self.operation_timeout,
exception_factory=_retry_exception_factory,
exception_factory=_helpers._retry_exception_factory,
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
def _read_rows_attempt(self) -> AsyncIterable[Row]:
"""
Attempt a single read_rows rpc call.
This function is intended to be wrapped by retry logic,
Expand Down Expand Up @@ -148,7 +145,7 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:

async def chunk_stream(
self, stream: Awaitable[AsyncIterable[ReadRowsResponsePB]]
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]:
) -> AsyncIterable[ReadRowsResponsePB.CellChunk]:
"""
process chunks out of raw read_rows stream
"""
Expand Down Expand Up @@ -193,9 +190,7 @@ async def chunk_stream(
current_key = None

@staticmethod
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None
):
async def merge_rows(chunks: AsyncIterable[ReadRowsResponsePB.CellChunk] | None):
"""
Merge chunks into rows
"""
Expand Down
Loading
Loading