Skip to content

Commit

Permalink
Merge branch 'cross_sync2_pr2_annotations' into cross_sync2_pr3_gener…
Browse files Browse the repository at this point in the history
…ated_sync
  • Loading branch information
daniel-sanche committed Nov 12, 2024
2 parents 2d04db7 + d489ad3 commit e0ab0b7
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 142 deletions.
2 changes: 2 additions & 0 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
CrossSync._Sync_Impl.add_mapping("_ReadRowsOperation", _ReadRowsOperation)
CrossSync.add_mapping("_MutateRowsOperation", _MutateRowsOperationAsync)
CrossSync._Sync_Impl.add_mapping("_MutateRowsOperation", _MutateRowsOperation)
CrossSync.add_mapping("MutationsBatcher", MutationsBatcherAsync)
CrossSync._Sync_Impl.add_mapping("MutationsBatcher", MutationsBatcher)

__version__: str = package_version.__version__

Expand Down
53 changes: 32 additions & 21 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,34 @@
from google.cloud.bigtable_v2.services.bigtable.transports import (
BigtableGrpcAsyncIOTransport as TransportType,
)
from google.cloud.bigtable.data._async.mutations_batcher import (
MutationsBatcherAsync,
_MB_SIZE,
)
from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import (
ExecuteQueryIteratorAsync,
)

from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE
else:
from typing import Iterable # noqa: F401
from grpc import insecure_channel
from google.cloud.bigtable_v2.services.bigtable.transports import BigtableGrpcTransport as TransportType # type: ignore
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( # noqa: F401
MutationsBatcher,
_MB_SIZE,
)
from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( # noqa: F401
ExecuteQueryIterator,
)
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE


if TYPE_CHECKING:
from google.cloud.bigtable.data._helpers import RowKeySamples
from google.cloud.bigtable.data._helpers import ShardedQuery

if CrossSync.is_async:
from google.cloud.bigtable.data._async.mutations_batcher import (
MutationsBatcherAsync,
)
from google.cloud.bigtable.data.execute_query._async.execute_query_iterator import (
ExecuteQueryIteratorAsync,
)
else:
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import ( # noqa: F401
MutationsBatcher,
)
from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import ( # noqa: F401
ExecuteQueryIterator,
)


__CROSS_SYNC_OUTPUT__ = "google.cloud.bigtable.data._sync_autogen.client"


Expand Down Expand Up @@ -276,7 +279,9 @@ async def close(self, timeout: float | None = 2.0):

@CrossSync.convert
async def _ping_and_warm_instances(
self, instance_key: _WarmedInstanceKey | None = None, channel: Channel | None = None
self,
instance_key: _WarmedInstanceKey | None = None,
channel: Channel | None = None,
) -> list[BaseException | None]:
"""
Prepares the backend for requests on a channel
Expand Down Expand Up @@ -366,8 +371,14 @@ async def _manage_channel(
await self._ping_and_warm_instances(channel=new_channel)
# cycle channel out of use, with long grace window before closure
self.transport._grpc_channel = new_channel
await old_channel.close(grace_period)
# subtract the time spent waiting for the channel to be replaced
# give old_channel a chance to complete existing rpcs
if CrossSync.is_async:
await old_channel.close(grace_period)
else:
if grace_period:
self._is_closed.wait(grace_period) # type: ignore
old_channel.close() # type: ignore
# subtract thed time spent waiting for the channel to be replaced
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0)

Expand Down Expand Up @@ -414,7 +425,7 @@ async def _register_instance(
}
)
async def _remove_instance_registration(
self, instance_id: str, owner: TableAsync | ExecuteQueryIteratorAsync
self, instance_id: str, owner: TableAsync | "ExecuteQueryIteratorAsync"
) -> bool:
"""
Removes an instance from the client's registered instances, to prevent
Expand Down Expand Up @@ -578,7 +589,7 @@ async def execute_query(
"proto_format": {},
}

return ExecuteQueryIteratorAsync(
return CrossSync.ExecuteQueryIterator(
self,
instance_id,
app_profile_id,
Expand Down Expand Up @@ -1125,7 +1136,7 @@ def mutations_batcher(
batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
batch_retryable_errors: Sequence[type[Exception]]
| TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
) -> MutationsBatcherAsync:
) -> "MutationsBatcherAsync":
"""
Returns a new mutations batcher instance.
Expand Down
4 changes: 1 addition & 3 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ async def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]
yield mutations[start_idx:end_idx]


@CrossSync.convert_class(
sync_name="MutationsBatcher", add_mapping_for_name="MutationsBatcher"
)
@CrossSync.convert_class(sync_name="MutationsBatcher")
class MutationsBatcherAsync:
"""
Allows users to send batches using context manager API:
Expand Down
24 changes: 13 additions & 11 deletions google/cloud/bigtable/data/_sync_autogen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,17 @@
from google.cloud.bigtable_v2.services.bigtable.transports import (
BigtableGrpcTransport as TransportType,
)
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import (
MutationsBatcher,
_MB_SIZE,
)
from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import (
ExecuteQueryIterator,
)
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import _MB_SIZE

if TYPE_CHECKING:
from google.cloud.bigtable.data._helpers import RowKeySamples
from google.cloud.bigtable.data._helpers import ShardedQuery
from google.cloud.bigtable.data._sync_autogen.mutations_batcher import (
MutationsBatcher,
)
from google.cloud.bigtable.data.execute_query._sync_autogen.execute_query_iterator import (
ExecuteQueryIterator,
)


@CrossSync._Sync_Impl.add_mapping_decorator("DataClient")
Expand Down Expand Up @@ -284,7 +284,9 @@ def _manage_channel(
new_channel = self.transport.create_channel()
self._ping_and_warm_instances(channel=new_channel)
self.transport._grpc_channel = new_channel
old_channel.close(grace_period)
if grace_period:
self._is_closed.wait(grace_period)
old_channel.close()
next_refresh = random.uniform(refresh_interval_min, refresh_interval_max)
next_sleep = max(next_refresh - (time.monotonic() - start_timestamp), 0)

Expand Down Expand Up @@ -314,7 +316,7 @@ def _register_instance(
self._start_background_channel_refresh()

def _remove_instance_registration(
self, instance_id: str, owner: Table | ExecuteQueryIterator
self, instance_id: str, owner: Table | "ExecuteQueryIterator"
) -> bool:
"""Removes an instance from the client's registered instances, to prevent
warming new channels for the instance
Expand Down Expand Up @@ -452,7 +454,7 @@ def execute_query(
"params": pb_params,
"proto_format": {},
}
return ExecuteQueryIterator(
return CrossSync._Sync_Impl.ExecuteQueryIterator(
self,
instance_id,
app_profile_id,
Expand Down Expand Up @@ -945,7 +947,7 @@ def mutations_batcher(
batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
batch_retryable_errors: Sequence[type[Exception]]
| TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
) -> MutationsBatcher:
) -> "MutationsBatcher":
"""Returns a new mutations batcher instance.
Can be used to iteratively add mutations that are flushed as a group,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def add_to_flow(self, mutations: RowMutationEntry | list[RowMutationEntry]):
yield mutations[start_idx:end_idx]


@CrossSync._Sync_Impl.add_mapping_decorator("MutationsBatcher")
class MutationsBatcher:
"""
Allows users to send batches using context manager API:
Expand Down
Loading

0 comments on commit e0ab0b7

Please sign in to comment.