Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] chore: use cross sync for async methods #991

Closed
wants to merge 176 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
176 commits
Select commit Hold shift + click to select a range
51d3557
pulled in sync generator from previous branch
daniel-sanche Apr 9, 2024
fb2ae39
simplified generator
daniel-sanche Apr 9, 2024
e788933
refactoring
daniel-sanche Apr 9, 2024
7c740ef
adding better support for yaml config
daniel-sanche Apr 9, 2024
ff0458d
updated import maps
daniel-sanche Apr 9, 2024
fcf2a2f
added back name_replacements
daniel-sanche Apr 9, 2024
51d9c7f
added configs for all async classes
daniel-sanche Apr 9, 2024
3fefea4
removed concrete class map
daniel-sanche Apr 9, 2024
b6a89dd
improced handling of unexpected asyncio tasks
daniel-sanche Apr 9, 2024
4d18e58
added concrete classes
daniel-sanche Apr 9, 2024
70fd731
keep docstrings on pass
daniel-sanche Apr 9, 2024
5c96cd3
improved import replacement code
daniel-sanche Apr 10, 2024
86b2e25
renamed the two types of replacements
daniel-sanche Apr 10, 2024
db16d0f
use concrete names in generator
daniel-sanche Apr 10, 2024
8c9676d
fixed import paths
daniel-sanche Apr 10, 2024
d4ca864
avoid circular import
daniel-sanche Apr 10, 2024
2d7b88f
improved string typing
daniel-sanche Apr 10, 2024
2c950f2
got client and table working
daniel-sanche Apr 10, 2024
f73a225
moved config into yaml file
daniel-sanche Apr 11, 2024
92d5608
moved save_path into yaml
daniel-sanche Apr 11, 2024
7b40f27
fixed sync emulator channel
daniel-sanche Apr 11, 2024
b3546d8
moved acceptance tests to async folder
daniel-sanche Apr 11, 2024
cbec8f2
broke module_replacements into asyncio and imports
daniel-sanche Apr 11, 2024
1af60be
update retries_async
daniel-sanche Apr 11, 2024
e486ad5
replaced asyncio.Task
daniel-sanche Apr 11, 2024
1974a50
removed use of async generator
daniel-sanche Apr 11, 2024
64ec3d5
fixed type issue
daniel-sanche Apr 11, 2024
f67ebc4
got _mutate_rows tests passing
daniel-sanche Apr 11, 2024
2ebbb72
got sync read_rows tests passing
daniel-sanche Apr 11, 2024
0bfb257
got flow control tests passing
daniel-sanche Apr 12, 2024
5b9e967
made some fixes to mutations batcher
daniel-sanche Apr 12, 2024
717c401
improved modularity of mutation batcher tests
daniel-sanche Apr 12, 2024
aa472c1
fixed TableAssync references
daniel-sanche Apr 12, 2024
06f0586
added event for batcher lock
daniel-sanche Apr 12, 2024
d097aa1
got mutation batcher tests passing
daniel-sanche Apr 12, 2024
8df5920
did some refactoring of client tests
daniel-sanche Apr 13, 2024
dcc0ab6
added sharded read rows sync
daniel-sanche Apr 13, 2024
10c565f
made tweaks to read rows sharded
daniel-sanche Apr 13, 2024
2cdf0ab
got client tests passing
daniel-sanche Apr 16, 2024
ef892bc
moved more logic into generated portion
daniel-sanche Apr 16, 2024
fe572bc
got table tests passing
daniel-sanche Apr 16, 2024
1d428a6
got all client tests passing
daniel-sanche Apr 16, 2024
751c676
fixed slow test
daniel-sanche Apr 17, 2024
97f4797
got async read_rows acceptance working again
daniel-sanche Apr 17, 2024
92f2132
got acceptance tests working
daniel-sanche Apr 17, 2024
c006482
ran blacken
daniel-sanche Apr 17, 2024
c39d26a
fixed some lint issues
daniel-sanche Apr 17, 2024
d5903e2
fixed mypy issues
daniel-sanche Apr 18, 2024
f9862a2
refactored system tests into class
daniel-sanche Apr 18, 2024
5cb4e82
fixed retry type
daniel-sanche Apr 18, 2024
46ef676
renamed test file
daniel-sanche Apr 18, 2024
44840f3
got system tests generated
daniel-sanche Apr 18, 2024
3c33954
fixed possible flake
daniel-sanche Apr 18, 2024
7c9c3bd
changed class order to match async
daniel-sanche Apr 18, 2024
7661fec
renamed class
daniel-sanche Apr 18, 2024
8e2db64
experiment with CrossSync annotations
daniel-sanche Apr 19, 2024
bbe5005
hacked together sync generator to work with annoptations
daniel-sanche Apr 19, 2024
4cbeaba
added sync implementations of certain functions
daniel-sanche Apr 19, 2024
86c6cd9
added crosssync to mutations batcher
daniel-sanche May 30, 2024
7b4614b
made it work with private classes
daniel-sanche May 30, 2024
d69164a
got broken unit tests working for batcher
daniel-sanche Jun 20, 2024
63fe1d7
index for replaced file names
daniel-sanche Jun 20, 2024
d40c016
simplified surface generator
daniel-sanche Jun 20, 2024
888cfc2
implemented rename_sync decorator
daniel-sanche Jun 20, 2024
6d55305
remove class decorator and abc super class
daniel-sanche Jun 20, 2024
605f874
added support for class-based replacements
daniel-sanche Jun 20, 2024
e930e54
replaced async classes
daniel-sanche Jun 20, 2024
96f92b8
added generation to ReadRowsOperation
daniel-sanche Jun 20, 2024
f37aa38
set up symbol replacement for ReadRowsOperation
daniel-sanche Jun 20, 2024
37549e9
use cross sync for retries
daniel-sanche Jun 20, 2024
b2975fb
refactored how sync implementation is handled
daniel-sanche Jun 20, 2024
77165fd
fixed imports for mutate_rows
daniel-sanche Jun 20, 2024
9b04852
fixed typing for _read_rows
daniel-sanche Jun 20, 2024
d8542f4
fixing typing issues
daniel-sanche Jun 21, 2024
975ed12
fixing mypy issues
daniel-sanche Jun 21, 2024
2dc3de4
ran blacken
daniel-sanche Jun 21, 2024
9bb7f6e
fixed executor in mutations batcher
daniel-sanche Jun 21, 2024
cc03c15
improved mutations batcher close
daniel-sanche Jun 21, 2024
c71e5a9
updated docstring
daniel-sanche Jun 21, 2024
0500969
fixed event wait
daniel-sanche Jun 21, 2024
5cd4528
added way to ignore mypy errors on generated files
daniel-sanche Jun 21, 2024
c6840b8
fixed import lint
daniel-sanche Jun 21, 2024
b52b70b
fixed mutations batcher async tests
daniel-sanche Jun 21, 2024
a476627
fixed mock path
daniel-sanche Jun 21, 2024
948d4a2
added event loop check
daniel-sanche Jun 21, 2024
c6810b6
got client unit tests passing
daniel-sanche Jun 21, 2024
f7b7523
added missing await
daniel-sanche Jun 21, 2024
d15c4a4
added typing to cross sync
daniel-sanche Jun 21, 2024
360a204
fixed type errors
daniel-sanche Jun 21, 2024
99b23e5
remove unused metaclasses
daniel-sanche Jun 21, 2024
9246589
fixed warning in tests
daniel-sanche Jun 21, 2024
9cf3923
removed old sync tests
daniel-sanche Jun 21, 2024
c59eec2
got unit tests passing
daniel-sanche Jun 21, 2024
fac2583
Merge branch 'main' into sync_generator_cross_sync
daniel-sanche Jun 22, 2024
8ef9047
updated sync files
daniel-sanche Jun 22, 2024
0705ee9
got sharding working with cross_sync
daniel-sanche Jun 22, 2024
426057f
fixed mypy errors
daniel-sanche Jun 22, 2024
0c79e39
ran blacken
daniel-sanche Jun 22, 2024
4aa53eb
generate sync unit tests
daniel-sanche Jun 24, 2024
6ae2428
improved import generation
daniel-sanche Jun 24, 2024
fdce0bc
fixed cross sync import conditional
daniel-sanche Jun 24, 2024
adc8bb7
fixed import
daniel-sanche Jun 24, 2024
31fb77a
got rpc tests passing
daniel-sanche Jun 24, 2024
fc44b30
removed custom is_async
daniel-sanche Jun 24, 2024
255e124
support dropping methods
daniel-sanche Jun 24, 2024
87aecb3
got tests passing
daniel-sanche Jun 25, 2024
a1426a5
got test_client sync tests passing
daniel-sanche Jun 25, 2024
ba351e3
dded simplified transformer to crosssync
daniel-sanche Jun 28, 2024
d6fac8e
strip out basic sync code
daniel-sanche Jun 28, 2024
d25a517
include headers
daniel-sanche Jun 28, 2024
5dd32fd
add file imports
daniel-sanche Jul 2, 2024
cd40ba9
refactoring
daniel-sanche Jul 3, 2024
fd639e4
fixed docstrings
daniel-sanche Jul 3, 2024
c1053e9
made changes to docstring format in generated files
daniel-sanche Jul 3, 2024
c925760
simplified generator visit
daniel-sanche Jul 3, 2024
9f9ec0f
replace string types
daniel-sanche Jul 3, 2024
e5168a1
add mypyy disabling
daniel-sanche Jul 3, 2024
9bd13b3
regenerated imports
daniel-sanche Jul 3, 2024
6895968
got rename_sync decorator working
daniel-sanche Jul 3, 2024
09c090d
made convert decorator
daniel-sanche Jul 3, 2024
ab28899
added CrossSync.Awaitable
daniel-sanche Jul 3, 2024
40678e0
more targeted replacements
daniel-sanche Jul 3, 2024
79f7b74
got test conversion working
daniel-sanche Jul 3, 2024
2652158
keep try import blocks
daniel-sanche Jul 3, 2024
5b31779
refactored into node transformer
daniel-sanche Jul 3, 2024
53dbb77
added pytest decorator
daniel-sanche Jul 3, 2024
e93f2ac
got test_client passing again
daniel-sanche Jul 3, 2024
515f565
added mock to crosssync
daniel-sanche Jul 3, 2024
4c6bac2
more targeted replacements
daniel-sanche Jul 3, 2024
998829e
ran blacken
daniel-sanche Jul 4, 2024
40e961e
fixed some lint issues
daniel-sanche Jul 4, 2024
ec63aa7
got tests passing
daniel-sanche Jul 4, 2024
5cdf5d9
ran black
daniel-sanche Jul 4, 2024
b964c8d
use crossync for system tests
daniel-sanche Jul 8, 2024
a968905
don't manually clear channel refresh list
daniel-sanche Jul 8, 2024
95c30f8
mark each method to convert
daniel-sanche Jul 8, 2024
8c88f08
cleaning; adding docstrings
daniel-sanche Jul 8, 2024
c93597b
use custom class for decorators
daniel-sanche Jul 9, 2024
32f1631
mark pytest methods for conversion
daniel-sanche Jul 11, 2024
b6fb1d5
convert crosssync.pytest to sync
daniel-sanche Jul 11, 2024
f90d54c
removed unneeded check
daniel-sanche Jul 11, 2024
f5dfa3e
use AstDecorator for pytest_fixture
daniel-sanche Jul 11, 2024
ce45742
renamed class decorator
daniel-sanche Jul 11, 2024
49b4808
import instead of duplicate
daniel-sanche Jul 11, 2024
18e4977
import sync classes
daniel-sanche Jul 11, 2024
48bb06f
removed sync classes
daniel-sanche Jul 11, 2024
245bd08
removed conversion decorators
daniel-sanche Jul 11, 2024
74a69c3
removed main function from cross_sync
daniel-sanche Jul 11, 2024
276add1
removed sync classes from __init__.py
daniel-sanche Jul 11, 2024
d3906bf
removed unused file
daniel-sanche Jul 11, 2024
73c6e2f
remove sync pooled generator
daniel-sanche Jul 11, 2024
1911296
reverted some style changes
daniel-sanche Jul 11, 2024
e166bbe
removed left ofer crosssync.drop
daniel-sanche Jul 11, 2024
6b244c5
removed else branches of cross sync imports
daniel-sanche Jul 11, 2024
9e1afc3
fixed mypy error
daniel-sanche Jul 11, 2024
2a46630
added back file
daniel-sanche Jul 12, 2024
2691686
removed file
daniel-sanche Jul 12, 2024
f48604e
moved non-async helper classes out of async folder
daniel-sanche Jul 12, 2024
a61c54f
fixed lint
daniel-sanche Jul 12, 2024
8b379c8
improve version string calculation
daniel-sanche Jul 12, 2024
14259e2
created method for event loop verification
daniel-sanche Jul 12, 2024
9b7c1e2
reverted some behavior
daniel-sanche Jul 12, 2024
88fda0d
added comments
daniel-sanche Jul 12, 2024
c3787ca
added comments
daniel-sanche Jul 12, 2024
7f65063
removed sync implementation from cross_sync
daniel-sanche Jul 12, 2024
39ae907
added back conversion annotations
daniel-sanche Jul 12, 2024
fd1fb71
fixed decorator sync_impl call
daniel-sanche Jul 12, 2024
adb092e
use add_mapping in place of replace_symbols
daniel-sanche Jul 12, 2024
45efa16
support automatic attribute registration
daniel-sanche Jul 12, 2024
e1ec974
reduced replace_symbols usage in tests
daniel-sanche Jul 13, 2024
021fde2
fixed lint issues
daniel-sanche Jul 13, 2024
22b093f
moved ast decorators into new file
daniel-sanche Jul 17, 2024
8d13c5e
ran blacken
daniel-sanche Jul 17, 2024
769cac1
changed sync impl name
daniel-sanche Jul 17, 2024
f87b832
fixed mapping function
daniel-sanche Jul 17, 2024
f2b6d08
convert to async with pytest mark
daniel-sanche Jul 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@
__all__ = (
"BigtableDataClientAsync",
"TableAsync",
"MutationsBatcherAsync",
"RowKeySamples",
"ReadRowsQuery",
"RowRange",
"MutationsBatcherAsync",
"Mutation",
"RowMutationEntry",
"SetCell",
Expand Down
37 changes: 19 additions & 18 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,36 @@
from __future__ import annotations

from typing import Sequence, TYPE_CHECKING
from dataclasses import dataclass
import functools

from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
from google.cloud.bigtable.data.mutations import _EntryWithProto

from google.cloud.bigtable.data._sync.cross_sync import CrossSync

if TYPE_CHECKING:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
BigtableAsyncClient,
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._async.client import TableAsync


@dataclass
class _EntryWithProto:
"""
A dataclass to hold a RowMutationEntry and its corresponding proto representation.
"""
if CrossSync.is_async:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
BigtableAsyncClient,
)

entry: RowMutationEntry
proto: types_pb.MutateRowsRequest.Entry
CrossSync.add_mapping("GapicClient", BigtableAsyncClient)


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync._mutate_rows._MutateRowsOperation",
add_mapping_for_name="_MutateRowsOperation",
)
class _MutateRowsOperationAsync:
"""
MutateRowsOperation manages the logic of sending a set of row mutations,
Expand All @@ -66,10 +64,11 @@ class _MutateRowsOperationAsync:
If not specified, the request will run until operation_timeout is reached.
"""

@CrossSync.convert
def __init__(
self,
gapic_client: "BigtableAsyncClient",
table: "TableAsync",
gapic_client: "CrossSync.GapicClient",
table: "CrossSync.Table",
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
Expand Down Expand Up @@ -100,7 +99,7 @@ def __init__(
bt_exceptions._MutateRowsIncomplete,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._operation = lambda: CrossSync.retry_target(
self._run_attempt,
self.is_retryable,
sleep_generator,
Expand All @@ -115,6 +114,7 @@ def __init__(
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}

@CrossSync.convert
async def start(self):
"""
Start the operation, and run until completion
Expand All @@ -124,7 +124,7 @@ async def start(self):
"""
try:
# trigger mutate_rows
await self._operation
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
Expand All @@ -151,6 +151,7 @@ async def start(self):
all_errors, len(self.mutations)
)

@CrossSync.convert
async def _run_attempt(self):
"""
Run a single attempt of the mutate_rows rpc.
Expand Down
45 changes: 21 additions & 24 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,7 @@

from __future__ import annotations

from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
)
from typing import Sequence

from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
Expand All @@ -32,22 +26,21 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data.exceptions import _ResetRow
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync


class _ResetRow(Exception):
def __init__(self, chunk):
self.chunk = chunk
from google.cloud.bigtable.data._sync.cross_sync import CrossSync


@CrossSync.export_sync(
path="google.cloud.bigtable.data._sync._read_rows._ReadRowsOperation",
add_mapping_for_name="_ReadRowsOperation",
)
class _ReadRowsOperationAsync:
"""
ReadRowsOperation handles the logic of merging chunks from a ReadRowsResponse stream
Expand Down Expand Up @@ -82,7 +75,7 @@ class _ReadRowsOperationAsync:
def __init__(
self,
query: ReadRowsQuery,
table: "TableAsync",
table: "CrossSync.Table",
operation_timeout: float,
attempt_timeout: float,
retryable_exceptions: Sequence[type[Exception]] = (),
Expand All @@ -108,22 +101,22 @@ def __init__(
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

def start_operation(self) -> AsyncGenerator[Row, None]:
def start_operation(self) -> CrossSync.Iterable[Row]:
"""
Start the read_rows operation, retrying on retryable errors.

Yields:
Row: The next row in the stream
"""
return retries.retry_target_stream_async(
return CrossSync.retry_target_stream(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
self.operation_timeout,
exception_factory=_retry_exception_factory,
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
def _read_rows_attempt(self) -> CrossSync.Iterable[Row]:
"""
Attempt a single read_rows rpc call.
This function is intended to be wrapped by retry logic,
Expand Down Expand Up @@ -159,9 +152,10 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)

@CrossSync.convert
async def chunk_stream(
self, stream: Awaitable[AsyncIterable[ReadRowsResponsePB]]
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]:
self, stream: CrossSync.Awaitable[CrossSync.Iterable[ReadRowsResponsePB]]
) -> CrossSync.Iterable[ReadRowsResponsePB.CellChunk]:
"""
process chunks out of raw read_rows stream

Expand Down Expand Up @@ -211,9 +205,12 @@ async def chunk_stream(
current_key = None

@staticmethod
@CrossSync.convert(
replace_symbols={"__aiter__": "__iter__", "__anext__": "__next__"}
)
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None
) -> AsyncGenerator[Row, None]:
chunks: CrossSync.Iterable[ReadRowsResponsePB.CellChunk] | None,
) -> CrossSync.Iterable[Row]:
"""
Merge chunks into rows

Expand All @@ -229,7 +226,7 @@ async def merge_rows(
while True:
try:
c = await it.__anext__()
except StopAsyncIteration:
except CrossSync.StopIteration:
# stream complete
return
row_key = c.row_key
Expand Down Expand Up @@ -322,7 +319,7 @@ async def merge_rows(
):
raise InvalidChunk("reset row with data")
continue
except StopAsyncIteration:
except CrossSync.StopIteration:
raise InvalidChunk("premature end of stream")

@staticmethod
Expand Down
Loading
Loading