diff --git a/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py b/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py index dd70cd09b2ba..6df505508ae9 100644 --- a/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py +++ b/sdks/python/apache_beam/ml/transforms/embeddings/vertex_ai.py @@ -19,10 +19,10 @@ # Follow https://cloud.google.com/vertex-ai/docs/python-sdk/use-vertex-ai-python-sdk # pylint: disable=line-too-long # to install Vertex AI Python SDK. -from collections.abc import Iterable -from collections.abc import Sequence import logging import time +from collections.abc import Iterable +from collections.abc import Sequence from typing import Any from typing import Optional @@ -101,6 +101,15 @@ def __init__( self.task_type = task_type self.title = title + # Configure AdaptiveThrottler and throttling metrics for client-side + # throttling behavior. + # See https://docs.google.com/document/d/1ePorJGZnLbNCmLD9mR7iFYOdPsyDA1rDnTpYnbdrzSU/edit?usp=sharing + # for more details. + self.throttled_secs = Metrics.counter( + VertexAIImageEmbeddings, "cumulativeThrottlingSeconds") + self.throttler = AdaptiveThrottler( + window_ms=1, bucket_ms=1, overload_ratio=2) + @retry.with_exponential_backoff( num_retries=5, retry_filter=_retry_on_appropriate_gcp_error) def get_request(