Skip to content

Commit

Permalink
🦉 Updates from OwlBot post-processor
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-owl-bot[bot] committed Dec 12, 2024
1 parent ea28fc9 commit f7c7216
Show file tree
Hide file tree
Showing 167 changed files with 3,993 additions and 75,435 deletions.
128 changes: 92 additions & 36 deletions google/pubsub_v1/services/publisher/async_client.py

Large diffs are not rendered by default.

133 changes: 97 additions & 36 deletions google/pubsub_v1/services/publisher/client.py

Large diffs are not rendered by default.

48 changes: 30 additions & 18 deletions google/pubsub_v1/services/publisher/pagers.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.
Expand All @@ -80,8 +80,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -140,7 +142,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.
Expand All @@ -154,8 +156,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicsRequest(request)
Expand Down Expand Up @@ -218,7 +222,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.
Expand All @@ -232,8 +236,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -292,7 +298,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.
Expand All @@ -306,8 +312,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSubscriptionsRequest(request)
Expand Down Expand Up @@ -370,7 +378,7 @@ def __init__(
*,
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiate the pager.
Expand All @@ -384,8 +392,10 @@ def __init__(
retry (google.api_core.retry.Retry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down Expand Up @@ -444,7 +454,7 @@ def __init__(
*,
retry: OptionalAsyncRetry = gapic_v1.method.DEFAULT,
timeout: Union[float, object] = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = ()
metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()
):
"""Instantiates the pager.
Expand All @@ -458,8 +468,10 @@ def __init__(
retry (google.api_core.retry.AsyncRetry): Designation of what errors,
if any, should be retried.
timeout (float): The timeout for this request.
metadata (Sequence[Tuple[str, str]]): Strings which should be
sent along with the request as metadata.
metadata (Sequence[Tuple[str, Union[str, bytes]]]): Key/value pairs which should be
sent along with the request as metadata. Normally, each value must be of type `str`,
but for metadata keys ending with the suffix `-bin`, the corresponding values must
be of type `bytes`.
"""
self._method = method
self._request = pubsub.ListTopicSnapshotsRequest(request)
Expand Down
114 changes: 100 additions & 14 deletions google/pubsub_v1/services/publisher/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging as std_logging
import pickle
import warnings
from typing import Callable, Dict, Optional, Sequence, Tuple, Union

Expand All @@ -21,15 +24,93 @@
import google.auth # type: ignore
from google.auth import credentials as ga_credentials # type: ignore
from google.auth.transport.grpc import SslCredentials # type: ignore
from google.protobuf.json_format import MessageToJson
import google.protobuf.message

import grpc # type: ignore
import proto # type: ignore

from google.iam.v1 import iam_policy_pb2 # type: ignore
from google.iam.v1 import policy_pb2 # type: ignore
from google.protobuf import empty_pb2 # type: ignore
from google.pubsub_v1.types import pubsub
from .base import PublisherTransport, DEFAULT_CLIENT_INFO

try:
from google.api_core import client_logging # type: ignore

CLIENT_LOGGING_SUPPORTED = True # pragma: NO COVER
except ImportError: # pragma: NO COVER
CLIENT_LOGGING_SUPPORTED = False

_LOGGER = std_logging.getLogger(__name__)


class _LoggingClientInterceptor(grpc.UnaryUnaryClientInterceptor): # pragma: NO COVER
def intercept_unary_unary(self, continuation, client_call_details, request):
logging_enabled = CLIENT_LOGGING_SUPPORTED and _LOGGER.isEnabledFor(
std_logging.DEBUG
)
if logging_enabled: # pragma: NO COVER
request_metadata = client_call_details.metadata
if isinstance(request, proto.Message):
request_payload = type(request).to_json(request)
elif isinstance(request, google.protobuf.message.Message):
request_payload = MessageToJson(request)
else:
request_payload = f"{type(request).__name__}: {pickle.dumps(request)}"

request_metadata = {
key: value.decode("utf-8") if isinstance(value, bytes) else value
for key, value in request_metadata
}
grpc_request = {
"payload": request_payload,
"requestMethod": "grpc",
"metadata": dict(request_metadata),
}
_LOGGER.debug(
f"Sending request for {client_call_details.method}",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"request": grpc_request,
"metadata": grpc_request["metadata"],
},
)

response = continuation(client_call_details, request)
if logging_enabled: # pragma: NO COVER
response_metadata = response.trailing_metadata()
# Convert gRPC metadata `<class 'grpc.aio._metadata.Metadata'>` to list of tuples
metadata = (
dict([(k, str(v)) for k, v in response_metadata])
if response_metadata
else None
)
result = response.result()
if isinstance(result, proto.Message):
response_payload = type(result).to_json(result)
elif isinstance(result, google.protobuf.message.Message):
response_payload = MessageToJson(result)
else:
response_payload = f"{type(result).__name__}: {pickle.dumps(result)}"
grpc_response = {
"payload": response_payload,
"metadata": metadata,
"status": "OK",
}
_LOGGER.debug(
f"Received response for {client_call_details.method}.",
extra={
"serviceName": "google.pubsub.v1.Publisher",
"rpcName": client_call_details.method,
"response": grpc_response,
"metadata": grpc_response["metadata"],
},
)
return response


class PublisherGrpcTransport(PublisherTransport):
"""gRPC backend transport for Publisher.
Expand Down Expand Up @@ -186,7 +267,12 @@ def __init__(
],
)

# Wrap messages. This must be done after self._grpc_channel exists
self._interceptor = _LoggingClientInterceptor()
self._logged_channel = grpc.intercept_channel(
self._grpc_channel, self._interceptor
)

# Wrap messages. This must be done after self._logged_channel exists
self._prep_wrapped_messages(client_info)

@classmethod
Expand Down Expand Up @@ -260,7 +346,7 @@ def create_topic(self) -> Callable[[pubsub.Topic], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "create_topic" not in self._stubs:
self._stubs["create_topic"] = self.grpc_channel.unary_unary(
self._stubs["create_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/CreateTopic",
request_serializer=pubsub.Topic.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -286,7 +372,7 @@ def update_topic(self) -> Callable[[pubsub.UpdateTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "update_topic" not in self._stubs:
self._stubs["update_topic"] = self.grpc_channel.unary_unary(
self._stubs["update_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/UpdateTopic",
request_serializer=pubsub.UpdateTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -311,7 +397,7 @@ def publish(self) -> Callable[[pubsub.PublishRequest], pubsub.PublishResponse]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "publish" not in self._stubs:
self._stubs["publish"] = self.grpc_channel.unary_unary(
self._stubs["publish"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/Publish",
request_serializer=pubsub.PublishRequest.serialize,
response_deserializer=pubsub.PublishResponse.deserialize,
Expand All @@ -335,7 +421,7 @@ def get_topic(self) -> Callable[[pubsub.GetTopicRequest], pubsub.Topic]:
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_topic" not in self._stubs:
self._stubs["get_topic"] = self.grpc_channel.unary_unary(
self._stubs["get_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/GetTopic",
request_serializer=pubsub.GetTopicRequest.serialize,
response_deserializer=pubsub.Topic.deserialize,
Expand All @@ -361,7 +447,7 @@ def list_topics(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topics" not in self._stubs:
self._stubs["list_topics"] = self.grpc_channel.unary_unary(
self._stubs["list_topics"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopics",
request_serializer=pubsub.ListTopicsRequest.serialize,
response_deserializer=pubsub.ListTopicsResponse.deserialize,
Expand Down Expand Up @@ -390,7 +476,7 @@ def list_topic_subscriptions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_subscriptions" not in self._stubs:
self._stubs["list_topic_subscriptions"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_subscriptions"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSubscriptions",
request_serializer=pubsub.ListTopicSubscriptionsRequest.serialize,
response_deserializer=pubsub.ListTopicSubscriptionsResponse.deserialize,
Expand Down Expand Up @@ -423,7 +509,7 @@ def list_topic_snapshots(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "list_topic_snapshots" not in self._stubs:
self._stubs["list_topic_snapshots"] = self.grpc_channel.unary_unary(
self._stubs["list_topic_snapshots"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/ListTopicSnapshots",
request_serializer=pubsub.ListTopicSnapshotsRequest.serialize,
response_deserializer=pubsub.ListTopicSnapshotsResponse.deserialize,
Expand Down Expand Up @@ -452,7 +538,7 @@ def delete_topic(self) -> Callable[[pubsub.DeleteTopicRequest], empty_pb2.Empty]
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "delete_topic" not in self._stubs:
self._stubs["delete_topic"] = self.grpc_channel.unary_unary(
self._stubs["delete_topic"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DeleteTopic",
request_serializer=pubsub.DeleteTopicRequest.serialize,
response_deserializer=empty_pb2.Empty.FromString,
Expand Down Expand Up @@ -484,7 +570,7 @@ def detach_subscription(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "detach_subscription" not in self._stubs:
self._stubs["detach_subscription"] = self.grpc_channel.unary_unary(
self._stubs["detach_subscription"] = self._logged_channel.unary_unary(
"/google.pubsub.v1.Publisher/DetachSubscription",
request_serializer=pubsub.DetachSubscriptionRequest.serialize,
response_deserializer=pubsub.DetachSubscriptionResponse.deserialize,
Expand All @@ -509,7 +595,7 @@ def set_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "set_iam_policy" not in self._stubs:
self._stubs["set_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["set_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/SetIamPolicy",
request_serializer=iam_policy_pb2.SetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand All @@ -535,7 +621,7 @@ def get_iam_policy(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "get_iam_policy" not in self._stubs:
self._stubs["get_iam_policy"] = self.grpc_channel.unary_unary(
self._stubs["get_iam_policy"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/GetIamPolicy",
request_serializer=iam_policy_pb2.GetIamPolicyRequest.SerializeToString,
response_deserializer=policy_pb2.Policy.FromString,
Expand Down Expand Up @@ -564,15 +650,15 @@ def test_iam_permissions(
# gRPC handles serialization and deserialization, so we just need
# to pass in the functions for each.
if "test_iam_permissions" not in self._stubs:
self._stubs["test_iam_permissions"] = self.grpc_channel.unary_unary(
self._stubs["test_iam_permissions"] = self._logged_channel.unary_unary(
"/google.iam.v1.IAMPolicy/TestIamPermissions",
request_serializer=iam_policy_pb2.TestIamPermissionsRequest.SerializeToString,
response_deserializer=iam_policy_pb2.TestIamPermissionsResponse.FromString,
)
return self._stubs["test_iam_permissions"]

def close(self):
self.grpc_channel.close()
self._logged_channel.close()

@property
def kind(self) -> str:
Expand Down
Loading

0 comments on commit f7c7216

Please sign in to comment.