From e0f2b703f36db152e0facf4d54f6f6475b8fcd88 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 11 Nov 2024 12:20:15 -0500 Subject: [PATCH 1/2] Update transforms module to PEP 585 type hints --- .../apache_beam/transforms/batch_dofn_test.py | 38 +++++------ .../combinefn_lifecycle_pipeline.py | 8 +-- .../apache_beam/transforms/combiners.py | 68 +++++++++---------- .../apache_beam/transforms/enrichment.py | 11 ++- .../transforms/enrichment_it_test.py | 8 +-- .../apache_beam/transforms/environments.py | 49 ++++++------- .../python/apache_beam/transforms/external.py | 5 +- .../transforms/external_transform_provider.py | 17 ++--- sdks/python/apache_beam/transforms/managed.py | 5 +- .../apache_beam/transforms/ptransform.py | 36 +++++----- .../apache_beam/transforms/resources.py | 17 +++-- .../apache_beam/transforms/sideinputs.py | 5 +- sdks/python/apache_beam/transforms/stats.py | 28 ++++---- .../transforms/timestamped_value_type_test.py | 12 ++-- .../apache_beam/transforms/userstate.py | 19 +++--- .../apache_beam/transforms/userstate_test.py | 3 +- sdks/python/apache_beam/transforms/util.py | 56 ++++++++------- .../apache_beam/transforms/util_test.py | 2 +- sdks/python/apache_beam/transforms/window.py | 13 ++-- 19 files changed, 181 insertions(+), 219 deletions(-) diff --git a/sdks/python/apache_beam/transforms/batch_dofn_test.py b/sdks/python/apache_beam/transforms/batch_dofn_test.py index d2aceb371492..f84b3689d9be 100644 --- a/sdks/python/apache_beam/transforms/batch_dofn_test.py +++ b/sdks/python/apache_beam/transforms/batch_dofn_test.py @@ -20,9 +20,7 @@ # pytype: skip-file import unittest -from typing import Iterator -from typing import List -from typing import Tuple +from collections.abc import Iterator from typing import no_type_check from parameterized import parameterized_class @@ -36,13 +34,13 @@ def process(self, element: int, *args, **kwargs) -> Iterator[float]: class BatchDoFn(beam.DoFn): - def process_batch(self, batch: List[int], *args, - **kwargs) -> Iterator[List[float]]: + def process_batch(self, batch: list[int], *args, + **kwargs) -> Iterator[list[float]]: yield [element / 2 for element in batch] class NoReturnAnnotation(beam.DoFn): - def process_batch(self, batch: List[int], *args, **kwargs): + def process_batch(self, batch: list[int], *args, **kwargs): yield [element * 2 for element in batch] @@ -51,24 +49,24 @@ def process_batch(self, batch, *args, **kwargs): yield [element * 2 for element in batch] def get_input_batch_type(self, input_element_type): - return List[input_element_type] + return list[input_element_type] def get_output_batch_type(self, input_element_type): - return List[input_element_type] + return list[input_element_type] class EitherDoFn(beam.DoFn): def process(self, element: int, *args, **kwargs) -> Iterator[float]: yield element / 2 - def process_batch(self, batch: List[int], *args, - **kwargs) -> Iterator[List[float]]: + def process_batch(self, batch: list[int], *args, + **kwargs) -> Iterator[list[float]]: yield [element / 2 for element in batch] class ElementToBatchDoFn(beam.DoFn): @beam.DoFn.yields_batches - def process(self, element: int, *args, **kwargs) -> Iterator[List[int]]: + def process(self, element: int, *args, **kwargs) -> Iterator[list[int]]: yield [element] * element def infer_output_type(self, input_element_type): @@ -77,8 +75,8 @@ def infer_output_type(self, input_element_type): class BatchToElementDoFn(beam.DoFn): @beam.DoFn.yields_elements - def process_batch(self, batch: List[int], *args, - **kwargs) -> Iterator[Tuple[int, int]]: + def process_batch(self, batch: list[int], *args, + **kwargs) -> Iterator[tuple[int, int]]: yield (sum(batch), len(batch)) @@ -178,11 +176,11 @@ class MismatchedBatchProducingDoFn(beam.DoFn): mismatched return types (one yields floats, the other ints). Should yield a construction time error when applied.""" @beam.DoFn.yields_batches - def process(self, element: int, *args, **kwargs) -> Iterator[List[int]]: + def process(self, element: int, *args, **kwargs) -> Iterator[list[int]]: yield [element] - def process_batch(self, batch: List[int], *args, - **kwargs) -> Iterator[List[float]]: + def process_batch(self, batch: list[int], *args, + **kwargs) -> Iterator[list[float]]: yield [element / 2 for element in batch] @@ -194,13 +192,13 @@ def process(self, element: int, *args, **kwargs) -> Iterator[float]: yield element / 2 @beam.DoFn.yields_elements - def process_batch(self, batch: List[int], *args, **kwargs) -> Iterator[int]: + def process_batch(self, batch: list[int], *args, **kwargs) -> Iterator[int]: yield batch[0] class NoElementOutputAnnotation(beam.DoFn): - def process_batch(self, batch: List[int], *args, - **kwargs) -> Iterator[List[int]]: + def process_batch(self, batch: list[int], *args, + **kwargs) -> Iterator[list[int]]: yield [element * 2 for element in batch] @@ -225,7 +223,7 @@ def test_no_input_annotation_raises(self): def test_unsupported_dofn_param_raises(self): class BadParam(beam.DoFn): @no_type_check - def process_batch(self, batch: List[int], key=beam.DoFn.KeyParam): + def process_batch(self, batch: list[int], key=beam.DoFn.KeyParam): yield batch * key p = beam.Pipeline() diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py index 56610e95297f..40495312964f 100644 --- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py +++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py @@ -18,8 +18,6 @@ # pytype: skip-file import math -from typing import Set -from typing import Tuple import apache_beam as beam from apache_beam.options.pipeline_options import TypeOptions @@ -36,7 +34,7 @@ @with_input_types(int) @with_output_types(int) class CallSequenceEnforcingCombineFn(beam.CombineFn): - instances: Set['CallSequenceEnforcingCombineFn'] = set() + instances: set['CallSequenceEnforcingCombineFn'] = set() def __init__(self): super().__init__() @@ -81,8 +79,8 @@ def teardown(self, *args, **kwargs): self._teardown_called = True -@with_input_types(Tuple[None, str]) -@with_output_types(Tuple[int, str]) +@with_input_types(tuple[None, str]) +@with_output_types(tuple[int, str]) class IndexAssigningDoFn(beam.DoFn): state_param = beam.DoFn.StateParam( userstate.CombiningValueStateSpec( diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 8b05e8da1df5..bc736b327284 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -24,12 +24,8 @@ import itertools import operator import random +from collections.abc import Iterable from typing import Any -from typing import Dict -from typing import Iterable -from typing import List -from typing import Set -from typing import Tuple from typing import TypeVar from typing import Union @@ -146,15 +142,15 @@ def expand(self, pcoll): else: return pcoll | core.CombineGlobally(CountCombineFn()).without_defaults() - @with_input_types(Tuple[K, V]) - @with_output_types(Tuple[K, int]) + @with_input_types(tuple[K, V]) + @with_output_types(tuple[K, int]) class PerKey(ptransform.PTransform): """combiners.Count.PerKey counts how many elements each unique key has.""" def expand(self, pcoll): return pcoll | core.CombinePerKey(CountCombineFn()) @with_input_types(T) - @with_output_types(Tuple[T, int]) + @with_output_types(tuple[T, int]) class PerElement(ptransform.PTransform): """combiners.Count.PerElement counts how many times each element occurs.""" def expand(self, pcoll): @@ -193,7 +189,7 @@ class Top(object): # pylint: disable=no-self-argument @with_input_types(T) - @with_output_types(List[T]) + @with_output_types(list[T]) class Of(CombinerWithoutDefaults): """Returns the n greatest elements in the PCollection. @@ -246,8 +242,8 @@ def expand(self, pcoll): TopCombineFn(self._n, self._key, self._reverse)).without_defaults() - @with_input_types(Tuple[K, V]) - @with_output_types(Tuple[K, List[V]]) + @with_input_types(tuple[K, V]) + @with_output_types(tuple[K, list[V]]) class PerKey(ptransform.PTransform): """Identifies the N greatest elements associated with each key. @@ -280,7 +276,7 @@ def expand(self, pcoll): """Expands the transform. Raises TypeCheckError: If the output type of the input PCollection is not - compatible with Tuple[A, B]. + compatible with tuple[A, B]. Args: pcoll: PCollection to process @@ -323,7 +319,7 @@ def SmallestPerKey(pcoll, n, *, key=None): @with_input_types(T) -@with_output_types(Tuple[None, List[T]]) +@with_output_types(tuple[None, list[T]]) class _TopPerBundle(core.DoFn): def __init__(self, n, key, reverse): self._n = n @@ -356,8 +352,8 @@ def finish_bundle(self): yield window.GlobalWindows.windowed_value((None, self._heap)) -@with_input_types(Tuple[None, Iterable[List[T]]]) -@with_output_types(List[T]) +@with_input_types(tuple[None, Iterable[list[T]]]) +@with_output_types(list[T]) class _MergeTopPerBundle(core.DoFn): def __init__(self, n, key, reverse): self._n = n @@ -380,7 +376,7 @@ def push(hp, e): return False if self._compare or self._key: - heapc: List[cy_combiners.ComparableValue] = [] + heapc: list[cy_combiners.ComparableValue] = [] for bundle in bundles: if not heapc: heapc = [ @@ -421,7 +417,7 @@ def push(hp, e): @with_input_types(T) -@with_output_types(List[T]) +@with_output_types(list[T]) class TopCombineFn(core.CombineFn): """CombineFn doing the combining for all of the Top transforms. @@ -467,7 +463,7 @@ def display_data(self): } # The accumulator type is a tuple - # (bool, Union[List[T], List[ComparableValue[T]]) + # (bool, Union[list[T], list[ComparableValue[T]]) # where the boolean indicates whether the second slot contains a List of T # (False) or List of ComparableValue[T] (True). In either case, the List # maintains heap invariance. When the contents of the List are @@ -564,7 +560,7 @@ class Sample(object): # pylint: disable=no-self-argument @with_input_types(T) - @with_output_types(List[T]) + @with_output_types(list[T]) class FixedSizeGlobally(CombinerWithoutDefaults): """Sample n elements from the input PCollection without replacement.""" def __init__(self, n): @@ -584,8 +580,8 @@ def display_data(self): def default_label(self): return 'FixedSizeGlobally(%d)' % self._n - @with_input_types(Tuple[K, V]) - @with_output_types(Tuple[K, List[V]]) + @with_input_types(tuple[K, V]) + @with_output_types(tuple[K, list[V]]) class FixedSizePerKey(ptransform.PTransform): """Sample n elements associated with each key without replacement.""" def __init__(self, n): @@ -602,7 +598,7 @@ def default_label(self): @with_input_types(T) -@with_output_types(List[T]) +@with_output_types(list[T]) class SampleCombineFn(core.CombineFn): """CombineFn for all Sample transforms.""" def __init__(self, n): @@ -734,7 +730,7 @@ def add_input(self, accumulator, element, *args, **kwargs): @with_input_types(T) -@with_output_types(List[T]) +@with_output_types(list[T]) class ToList(CombinerWithoutDefaults): """A global CombineFn that condenses a PCollection into a single list.""" def expand(self, pcoll): @@ -746,7 +742,7 @@ def expand(self, pcoll): @with_input_types(T) -@with_output_types(List[T]) +@with_output_types(list[T]) class ToListCombineFn(core.CombineFn): """CombineFn for to_list.""" def create_accumulator(self): @@ -780,8 +776,8 @@ def extract_output(self, accumulator): return accumulator -@with_input_types(Tuple[K, V]) -@with_output_types(Dict[K, V]) +@with_input_types(tuple[K, V]) +@with_output_types(dict[K, V]) class ToDict(CombinerWithoutDefaults): """A global CombineFn that condenses a PCollection into a single dict. @@ -797,8 +793,8 @@ def expand(self, pcoll): ToDictCombineFn()).without_defaults() -@with_input_types(Tuple[K, V]) -@with_output_types(Dict[K, V]) +@with_input_types(tuple[K, V]) +@with_output_types(dict[K, V]) class ToDictCombineFn(core.CombineFn): """CombineFn for to_dict.""" def create_accumulator(self): @@ -820,7 +816,7 @@ def extract_output(self, accumulator): @with_input_types(T) -@with_output_types(Set[T]) +@with_output_types(set[T]) class ToSet(CombinerWithoutDefaults): """A global CombineFn that condenses a PCollection into a set.""" def expand(self, pcoll): @@ -832,7 +828,7 @@ def expand(self, pcoll): @with_input_types(T) -@with_output_types(Set[T]) +@with_output_types(set[T]) class ToSetCombineFn(core.CombineFn): """CombineFn for ToSet.""" def create_accumulator(self): @@ -941,17 +937,17 @@ def expand(self, pcoll): return ( pcoll | core.ParDo(self.add_timestamp).with_output_types( - Tuple[T, TimestampType]) + tuple[T, TimestampType]) | core.CombineGlobally(LatestCombineFn())) else: return ( pcoll | core.ParDo(self.add_timestamp).with_output_types( - Tuple[T, TimestampType]) + tuple[T, TimestampType]) | core.CombineGlobally(LatestCombineFn()).without_defaults()) - @with_input_types(Tuple[K, V]) - @with_output_types(Tuple[K, V]) + @with_input_types(tuple[K, V]) + @with_output_types(tuple[K, V]) class PerKey(ptransform.PTransform): """Compute elements with the latest timestamp for each key from a keyed PCollection""" @@ -964,11 +960,11 @@ def expand(self, pcoll): return ( pcoll | core.ParDo(self.add_timestamp).with_output_types( - Tuple[K, Tuple[T, TimestampType]]) + tuple[K, tuple[T, TimestampType]]) | core.CombinePerKey(LatestCombineFn())) -@with_input_types(Tuple[T, TimestampType]) +@with_input_types(tuple[T, TimestampType]) @with_output_types(T) class LatestCombineFn(core.CombineFn): """CombineFn to get the element with the latest timestamp diff --git a/sdks/python/apache_beam/transforms/enrichment.py b/sdks/python/apache_beam/transforms/enrichment.py index 5bb1e2024e79..192aff6db4f9 100644 --- a/sdks/python/apache_beam/transforms/enrichment.py +++ b/sdks/python/apache_beam/transforms/enrichment.py @@ -15,10 +15,9 @@ # limitations under the License. # import logging +from collections.abc import Callable from datetime import timedelta from typing import Any -from typing import Callable -from typing import Dict from typing import Optional from typing import TypeVar from typing import Union @@ -44,7 +43,7 @@ InputT = TypeVar('InputT') OutputT = TypeVar('OutputT') -JoinFn = Callable[[Dict[str, Any], Dict[str, Any]], beam.Row] +JoinFn = Callable[[dict[str, Any], dict[str, Any]], beam.Row] _LOGGER = logging.getLogger(__name__) @@ -56,14 +55,14 @@ def has_valid_redis_address(host: str, port: int) -> bool: return False -def cross_join(left: Dict[str, Any], right: Dict[str, Any]) -> beam.Row: +def cross_join(left: dict[str, Any], right: dict[str, Any]) -> beam.Row: """performs a cross join on two `dict` objects. Joins the columns of the right row onto the left row. Args: - left (Dict[str, Any]): input request dictionary. - right (Dict[str, Any]): response dictionary from the API. + left (dict[str, Any]): input request dictionary. + right (dict[str, Any]): response dictionary from the API. Returns: `beam.Row` containing the merged columns. diff --git a/sdks/python/apache_beam/transforms/enrichment_it_test.py b/sdks/python/apache_beam/transforms/enrichment_it_test.py index 4a45fae2e869..43a0c3903074 100644 --- a/sdks/python/apache_beam/transforms/enrichment_it_test.py +++ b/sdks/python/apache_beam/transforms/enrichment_it_test.py @@ -16,9 +16,7 @@ # import time import unittest -from typing import List from typing import NamedTuple -from typing import Tuple from typing import Union import pytest @@ -61,7 +59,7 @@ def __init__(self, url: str): def __call__(self, request: Request, *args, **kwargs): """Overrides ``Caller``'s call method invoking the ``EchoServiceGrpc``'s HTTP handler with an `dict`, returning - either a successful ``Tuple[dict,dict]`` or throwing either a + either a successful ``tuple[dict,dict]`` or throwing either a ``UserCodeExecutionException``, ``UserCodeTimeoutException``, or a ``UserCodeQuotaException``. """ @@ -93,7 +91,7 @@ def __call__(self, request: Request, *args, **kwargs): class ValidateFields(beam.DoFn): """ValidateFields validates if a PCollection of `beam.Row` has certain fields.""" - def __init__(self, n_fields: int, fields: List[str]): + def __init__(self, n_fields: int, fields: list[str]): self.n_fields = n_fields self._fields = fields @@ -124,7 +122,7 @@ def setUpClass(cls) -> None: @classmethod def _get_client_and_options( - cls) -> Tuple[SampleHTTPEnrichment, EchoITOptions]: + cls) -> tuple[SampleHTTPEnrichment, EchoITOptions]: assert cls.options is not None assert cls.client is not None return cls.client, cls.options diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 77704e0522b2..189cbd172695 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -25,19 +25,14 @@ import logging import sys import tempfile +from collections.abc import Callable +from collections.abc import Iterable +from collections.abc import Iterator +from collections.abc import Mapping from types import MappingProxyType from typing import TYPE_CHECKING from typing import Any -from typing import Callable -from typing import Dict -from typing import Iterable -from typing import Iterator -from typing import List -from typing import Mapping from typing import Optional -from typing import Set -from typing import Tuple -from typing import Type from typing import TypeVar from typing import Union from typing import overload @@ -108,8 +103,8 @@ class Environment(object): For internal use only. No backwards compatibility guarantees. """ - _known_urns = {} # type: Dict[str, Tuple[Optional[type], ConstructorFn]] - _urn_to_env_cls = {} # type: Dict[str, type] + _known_urns = {} # type: dict[str, tuple[Optional[type], ConstructorFn]] + _urn_to_env_cls = {} # type: dict[str, type] def __init__(self, capabilities=(), # type: Iterable[str] @@ -141,7 +136,7 @@ def artifacts(self): return self._artifacts def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, Optional[Union[message.Message, bytes, str]]] + # type: (PipelineContext) -> tuple[str, Optional[Union[message.Message, bytes, str]]] raise NotImplementedError def capabilities(self): @@ -157,7 +152,7 @@ def resource_hints(self): def register_urn( cls, urn, # type: str - parameter_type, # type: Type[T] + parameter_type, # type: type[T] ): # type: (...) -> Callable[[Union[type, Callable[[T, Iterable[str], PipelineContext], Any]]], Callable[[T, Iterable[str], PipelineContext], Any]] pass @@ -176,7 +171,7 @@ def register_urn( @overload def register_urn(cls, urn, # type: str - parameter_type, # type: Type[T] + parameter_type, # type: type[T] constructor # type: Callable[[T, Iterable[str], Iterable[beam_runner_api_pb2.ArtifactInformation], PipelineContext], Any] ): # type: (...) -> None @@ -215,7 +210,7 @@ def register(constructor): @classmethod def get_env_cls_from_urn(cls, urn): - # type: (str) -> Type[Environment] + # type: (str) -> type[Environment] return cls._urn_to_env_cls[urn] def to_runner_api(self, context): @@ -250,7 +245,7 @@ def from_runner_api(cls, @classmethod def from_options(cls, options): - # type: (Type[EnvironmentT], PortableOptions) -> EnvironmentT + # type: (type[EnvironmentT], PortableOptions) -> EnvironmentT """Creates an Environment object from PortableOptions. @@ -333,7 +328,7 @@ def __repr__(self): return 'DockerEnvironment(container_image=%s)' % self.container_image def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.DockerPayload] + # type: (PipelineContext) -> tuple[str, beam_runner_api_pb2.DockerPayload] return ( common_urns.environments.DOCKER.urn, beam_runner_api_pb2.DockerPayload(container_image=self.container_image)) @@ -444,7 +439,7 @@ def __repr__(self): return 'ProcessEnvironment(%s)' % ','.join(repr_parts) def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ProcessPayload] + # type: (PipelineContext) -> tuple[str, beam_runner_api_pb2.ProcessPayload] return ( common_urns.environments.PROCESS.urn, beam_runner_api_pb2.ProcessPayload( @@ -539,7 +534,7 @@ def __repr__(self): return 'ExternalEnvironment(url=%s,params=%s)' % (self.url, self.params) def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.ExternalPayload] + # type: (PipelineContext) -> tuple[str, beam_runner_api_pb2.ExternalPayload] return ( common_urns.environments.EXTERNAL.urn, beam_runner_api_pb2.ExternalPayload( @@ -606,7 +601,7 @@ def resolve_anyof_environment(env_proto, *preferred_types): @Environment.register_urn(python_urns.EMBEDDED_PYTHON, None) class EmbeddedPythonEnvironment(Environment): def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, None] + # type: (PipelineContext) -> tuple[str, None] return python_urns.EMBEDDED_PYTHON, None @staticmethod @@ -672,7 +667,7 @@ def __repr__(self): return 'EmbeddedPythonGrpcEnvironment(%s)' % ','.join(repr_parts) def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, bytes] + # type: (PipelineContext) -> tuple[str, bytes] params = {} if self.state_cache_size is not None: params['state_cache_size'] = self.state_cache_size @@ -720,7 +715,7 @@ def from_options(cls, options): @staticmethod def parse_config(s): - # type: (str) -> Dict[str, Any] + # type: (str) -> dict[str, Any] if looks_like_json(s): config_dict = json.loads(s) if 'state_cache_size' in config_dict: @@ -743,7 +738,7 @@ def default(cls): class PythonLoopbackEnvironment(EmbeddedPythonEnvironment): """Used as a stub when the loopback worker has not yet been started.""" def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, None] + # type: (PipelineContext) -> tuple[str, None] return python_urns.EMBEDDED_PYTHON_LOOPBACK, None @staticmethod @@ -785,7 +780,7 @@ def __repr__(self): return 'SubprocessSDKEnvironment(command_string=%s)' % self.command_string def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, bytes] + # type: (PipelineContext) -> tuple[str, bytes] return python_urns.SUBPROCESS_SDK, self.command_string.encode('utf-8') @staticmethod @@ -823,7 +818,7 @@ def __init__(self, environments): self._environments = environments def to_runner_api_parameter(self, context): - # type: (PipelineContext) -> Tuple[str, beam_runner_api_pb2.AnyOfEnvironmentPayload] + # type: (PipelineContext) -> tuple[str, beam_runner_api_pb2.AnyOfEnvironmentPayload] return ( common_urns.environments.ANYOF.urn, beam_runner_api_pb2.AnyOfEnvironmentPayload( @@ -855,7 +850,7 @@ def create_proto( class PyPIArtifactRegistry(object): - _registered_artifacts = set() # type: Set[Tuple[str, str]] + _registered_artifacts = set() # type: set[tuple[str, str]] @classmethod def register_artifact(cls, name, version): @@ -868,7 +863,7 @@ def get_artifacts(cls): def python_sdk_capabilities(): - # type: () -> List[str] + # type: () -> list[str] return list(_python_sdk_capabilities_iter()) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 83c439ca8ddd..84f558a66d21 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -29,7 +29,6 @@ import uuid from collections import OrderedDict from collections import namedtuple -from typing import Dict import grpc @@ -653,8 +652,8 @@ def __init__(self, urn, payload, expansion_service=None): payload.payload() if isinstance(payload, PayloadBuilder) else payload) self._expansion_service = expansion_service self._external_namespace = self._fresh_namespace() - self._inputs = {} # type: Dict[str, pvalue.PCollection] - self._outputs = {} # type: Dict[str, pvalue.PCollection] + self._inputs = {} # type: dict[str, pvalue.PCollection] + self._outputs = {} # type: dict[str, pvalue.PCollection] def with_output_types(self, *args, **kwargs): return WithTypeHints.with_output_types(self, *args, **kwargs) diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py index 117c7f7c9b93..db297540471a 100644 --- a/sdks/python/apache_beam/transforms/external_transform_provider.py +++ b/sdks/python/apache_beam/transforms/external_transform_provider.py @@ -20,9 +20,6 @@ from collections import namedtuple from inspect import Parameter from inspect import Signature -from typing import Dict -from typing import List -from typing import Tuple from apache_beam.transforms import PTransform from apache_beam.transforms.external import BeamJarExpansionService @@ -46,7 +43,7 @@ def snake_case_to_upper_camel_case(string): def get_config_with_descriptions( - schematransform: SchemaTransformsConfig) -> Dict[str, ParamInfo]: + schematransform: SchemaTransformsConfig) -> dict[str, ParamInfo]: # Prepare a configuration schema that includes types and descriptions schema = named_tuple_to_schema(schematransform.configuration_schema) descriptions = schematransform.configuration_schema._field_descriptions @@ -63,7 +60,7 @@ def get_config_with_descriptions( def _generate_signature(schematransform: SchemaTransformsConfig) -> Signature: schema = named_tuple_to_schema(schematransform.configuration_schema) descriptions = schematransform.configuration_schema._field_descriptions - params: List[Parameter] = [] + params: list[Parameter] = [] for field in schema.fields: annotation = str(typing_from_runner_api(field.type)) description = descriptions[field.name] @@ -88,7 +85,7 @@ class ExternalTransform(PTransform): # creating an ExternalTransform type default_expansion_service = None identifier: str = "" - configuration_schema: Dict[str, ParamInfo] = {} + configuration_schema: dict[str, ParamInfo] = {} def __init__(self, expansion_service=None, **kwargs): self._kwargs = kwargs @@ -195,8 +192,8 @@ def __init__(self, expansion_services, urn_pattern=STANDARD_URN_PATTERN): By default, the following pattern is used: [{STANDARD_URN_PATTERN}] """ self._urn_pattern = urn_pattern - self._transforms: Dict[str, type(ExternalTransform)] = {} - self._name_to_urn: Dict[str, str] = {} + self._transforms: dict[str, type(ExternalTransform)] = {} + self._name_to_urn: dict[str, str] = {} if isinstance(expansion_services, set): expansion_services = list(expansion_services) @@ -257,11 +254,11 @@ def _create_wrappers(self): for transform in self._transforms.values(): setattr(self, transform.__name__, transform) - def get_available(self) -> List[Tuple[str, str]]: + def get_available(self) -> list[tuple[str, str]]: """Get a list of available ExternalTransform names and identifiers""" return list(self._name_to_urn.items()) - def get_all(self) -> Dict[str, ExternalTransform]: + def get_all(self) -> dict[str, ExternalTransform]: """Get all ExternalTransform""" return self._transforms diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 22ee15b1de1c..9b957f9250f8 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -65,7 +65,6 @@ """ from typing import Any -from typing import Dict from typing import Optional import yaml @@ -95,7 +94,7 @@ class Read(PTransform): def __init__( self, source: str, - config: Optional[Dict[str, Any]] = None, + config: Optional[dict[str, Any]] = None, config_url: Optional[str] = None, expansion_service=None): super().__init__() @@ -135,7 +134,7 @@ class Write(PTransform): def __init__( self, sink: str, - config: Optional[Dict[str, Any]] = None, + config: Optional[dict[str, Any]] = None, config_url: Optional[str] = None, expansion_service=None): super().__init__() diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 4848dc4aade8..7d36abf638f8 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -47,19 +47,15 @@ class and wrapper class that allows lambda functions to be used as import sys import threading import warnings +from collections.abc import Callable +from collections.abc import Mapping +from collections.abc import Sequence from functools import reduce from functools import wraps from typing import TYPE_CHECKING from typing import Any -from typing import Callable -from typing import Dict from typing import Generic -from typing import List -from typing import Mapping from typing import Optional -from typing import Sequence -from typing import Tuple -from typing import Type from typing import TypeVar from typing import Union from typing import overload @@ -149,7 +145,7 @@ def visit(self, node, replacements): # in-process, in eager mode. This cache allows the same _MaterializedResult # object to be accessed and used despite Runner API round-trip serialization. _pipeline_materialization_cache = { -} # type: Dict[Tuple[int, int], Dict[int, _MaterializedResult]] +} # type: dict[tuple[int, int], dict[int, _MaterializedResult]] _pipeline_materialization_lock = threading.Lock() @@ -200,7 +196,7 @@ def __init__(self, pipeline_id, result_id): # type: (int, int) -> None self._pipeline_id = pipeline_id self._result_id = result_id - self.elements = [] # type: List[Any] + self.elements = [] # type: list[Any] def __reduce__(self): # When unpickled (during Runner API roundtrip serailization), get the @@ -376,7 +372,7 @@ def default_label(self): # type: () -> str return self.__class__.__name__ - def annotations(self) -> Dict[str, Union[bytes, str, message.Message]]: + def annotations(self) -> dict[str, Union[bytes, str, message.Message]]: return { 'python_type': # f'{self.__class__.__module__}.{self.__class__.__qualname__}' @@ -458,11 +454,11 @@ def with_resource_hints(self, **kwargs): # type: (...) -> PTransform return self def get_resource_hints(self): - # type: () -> Dict[str, bytes] + # type: () -> dict[str, bytes] if '_resource_hints' not in self.__dict__: # PTransform subclasses don't always call super(), so prefer lazy # initialization. By default, transforms don't have any resource hints. - self._resource_hints = {} # type: Dict[str, bytes] + self._resource_hints = {} # type: dict[str, bytes] return self._resource_hints def type_check_inputs(self, pvalueish): @@ -657,7 +653,7 @@ def _pvaluish_from_dict(self, input_dict): return input_dict def _named_inputs(self, main_inputs, side_inputs): - # type: (Mapping[str, pvalue.PValue], Sequence[Any]) -> Dict[str, pvalue.PValue] + # type: (Mapping[str, pvalue.PValue], Sequence[Any]) -> dict[str, pvalue.PValue] """Returns the dictionary of named inputs (including side inputs) as they should be named in the beam proto. @@ -672,7 +668,7 @@ def _named_inputs(self, main_inputs, side_inputs): return dict(main_inputs, **named_side_inputs) def _named_outputs(self, outputs): - # type: (Dict[object, pvalue.PCollection]) -> Dict[str, pvalue.PCollection] + # type: (dict[object, pvalue.PCollection]) -> dict[str, pvalue.PCollection] """Returns the dictionary of named outputs as they should be named in the beam proto. @@ -684,14 +680,14 @@ def _named_outputs(self, outputs): if isinstance(output, pvalue.PCollection) } - _known_urns = {} # type: Dict[str, Tuple[Optional[type], ConstructorFn]] + _known_urns = {} # type: dict[str, tuple[Optional[type], ConstructorFn]] @classmethod @overload def register_urn( cls, urn, # type: str - parameter_type, # type: Type[T] + parameter_type, # type: type[T] ): # type: (...) -> Callable[[Union[type, Callable[[beam_runner_api_pb2.PTransform, T, PipelineContext], Any]]], Callable[[T, PipelineContext], Any]] pass @@ -710,7 +706,7 @@ def register_urn( @overload def register_urn(cls, urn, # type: str - parameter_type, # type: Type[T] + parameter_type, # type: type[T] constructor # type: Callable[[beam_runner_api_pb2.PTransform, T, PipelineContext], Any] ): # type: (...) -> None @@ -776,21 +772,21 @@ def to_runner_api_parameter( self, unused_context # type: PipelineContext ): - # type: (...) -> Tuple[str, Optional[Union[message.Message, bytes, str]]] + # type: (...) -> tuple[str, Optional[Union[message.Message, bytes, str]]] # The payload here is just to ease debugging. return ( python_urns.GENERIC_COMPOSITE_TRANSFORM, getattr(self, '_fn_api_payload', str(self))) def to_runner_api_pickled(self, unused_context): - # type: (PipelineContext) -> Tuple[str, bytes] + # type: (PipelineContext) -> tuple[str, bytes] return (python_urns.PICKLED_TRANSFORM, pickler.dumps(self)) def runner_api_requires_keyed_input(self): return False def _add_type_constraint_from_consumer(self, full_label, input_type_hints): - # type: (str, Tuple[str, Any]) -> None + # type: (str, tuple[str, Any]) -> None """Adds a consumer transform's input type hints to our output type constraints, which is used during performance runtime type-checking. diff --git a/sdks/python/apache_beam/transforms/resources.py b/sdks/python/apache_beam/transforms/resources.py index 04f38d368122..139b5e3fcc5e 100644 --- a/sdks/python/apache_beam/transforms/resources.py +++ b/sdks/python/apache_beam/transforms/resources.py @@ -26,9 +26,8 @@ """ import re +from collections.abc import Mapping from typing import Any -from typing import Dict -from typing import Mapping from typing import Optional from apache_beam.options.pipeline_options import PipelineOptions @@ -51,11 +50,11 @@ class ResourceHint: # A unique URN, one per Resource Hint class. urn: Optional[str] = None - _urn_to_known_hints: Dict[str, type] = {} - _name_to_known_hints: Dict[str, type] = {} + _urn_to_known_hints: dict[str, type] = {} + _name_to_known_hints: dict[str, type] = {} @classmethod - def parse(cls, value: str) -> Dict[str, bytes]: + def parse(cls, value: str) -> dict[str, bytes]: """Describes how to parse the hint. Override to specify a custom parsing logic.""" assert cls.urn is not None @@ -159,7 +158,7 @@ class MinRamHint(ResourceHint): urn = resource_hints.MIN_RAM_BYTES.urn @classmethod - def parse(cls, value: str) -> Dict[str, bytes]: + def parse(cls, value: str) -> dict[str, bytes]: return {cls.urn: ResourceHint._parse_storage_size_str(value)} @classmethod @@ -186,7 +185,7 @@ def get_merged_value(cls, outer_value: bytes, inner_value: bytes) -> bytes: ResourceHint.register_resource_hint('cpuCount', CpuCountHint) -def parse_resource_hints(hints: Dict[Any, Any]) -> Dict[str, bytes]: +def parse_resource_hints(hints: dict[Any, Any]) -> dict[str, bytes]: parsed_hints = {} for hint, value in hints.items(): try: @@ -202,7 +201,7 @@ def parse_resource_hints(hints: Dict[Any, Any]) -> Dict[str, bytes]: def resource_hints_from_options( - options: Optional[PipelineOptions]) -> Dict[str, bytes]: + options: Optional[PipelineOptions]) -> dict[str, bytes]: if options is None: return {} hints = {} @@ -219,7 +218,7 @@ def resource_hints_from_options( def merge_resource_hints( outer_hints: Mapping[str, bytes], - inner_hints: Mapping[str, bytes]) -> Dict[str, bytes]: + inner_hints: Mapping[str, bytes]) -> dict[str, bytes]: merged_hints = dict(inner_hints) for urn, outer_value in outer_hints.items(): if urn in inner_hints: diff --git a/sdks/python/apache_beam/transforms/sideinputs.py b/sdks/python/apache_beam/transforms/sideinputs.py index 0ff2a388b9e1..7d72a02f8874 100644 --- a/sdks/python/apache_beam/transforms/sideinputs.py +++ b/sdks/python/apache_beam/transforms/sideinputs.py @@ -27,10 +27,9 @@ # pytype: skip-file import re +from collections.abc import Callable from typing import TYPE_CHECKING from typing import Any -from typing import Callable -from typing import Dict from apache_beam.transforms import window @@ -82,7 +81,7 @@ def __init__(self, view_class: 'pvalue.AsSideInput', view_options, iterable): self._view_class = view_class self._view_options = view_options self._iterable = iterable - self._cache: Dict[window.BoundedWindow, Any] = {} + self._cache: dict[window.BoundedWindow, Any] = {} def __getitem__(self, window: window.BoundedWindow) -> Any: if window not in self._cache: diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 0d56b60b050f..28d42245d8bd 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -34,10 +34,8 @@ import logging import math import typing +from collections.abc import Callable from typing import Any -from typing import Callable -from typing import List -from typing import Tuple from apache_beam import coders from apache_beam import typehints @@ -322,7 +320,7 @@ def _display_data(num_quantiles, key, reverse, weighted, input_batched): @typehints.with_input_types( typehints.Union[typing.Sequence[T], typing.Tuple[T, float]]) - @typehints.with_output_types(typing.List[T]) + @typehints.with_output_types(list[T]) class Globally(PTransform): """ PTransform takes PCollection and returns a list whose single value is @@ -374,9 +372,9 @@ def display_data(self): input_batched=self._input_batched) @typehints.with_input_types( - typehints.Union[typing.Tuple[K, V], - typing.Tuple[K, typing.Tuple[V, float]]]) - @typehints.with_output_types(typing.Tuple[K, typing.List[V]]) + typehints.Union[tuple[K, V], + tuple[K, tuple[V, float]]]) + @typehints.with_output_types(tuple[K, list[V]]) class PerKey(PTransform): """ PTransform takes PCollection of KV and returns a list based on each key @@ -453,7 +451,7 @@ def __init__(self, buffer_size, num_buffers, weighted, key, reverse): self.less_than = lambda a, b: key(a) < key(b) def get_argsort_key(self, elements): - # type: (List) -> Callable[[int], Any] + # type: (list) -> Callable[[int], Any] """Returns a key for sorting indices of elements by element's value.""" if self.key is None: @@ -478,7 +476,7 @@ class _QuantileBuffer(object): &type=pdf and ApproximateQuantilesCombineFn for further information)""" def __init__( self, elements, weights, weighted, level=0, min_val=None, max_val=None): - # type: (List, List, bool, int, Any, Any) -> None + # type: (list, list, bool, int, Any, Any) -> None self.elements = elements # In non-weighted case weights contains a single element representing weight # of the buffer in the sense of the original algorithm. In weighted case, @@ -510,7 +508,7 @@ class _QuantileState(object): Compact summarization of a collection on which quantiles can be estimated. """ def __init__(self, unbuffered_elements, unbuffered_weights, buffers, spec): - # type: (List, List, List[_QuantileBuffer], _QuantileSpec) -> None + # type: (list, list, list[_QuantileBuffer], _QuantileSpec) -> None self.buffers = buffers self.spec = spec if spec.weighted: @@ -544,7 +542,7 @@ def is_empty(self): return not self.unbuffered_elements and not self.buffers def _add_unbuffered(self, elements, offset_fn): - # type: (List, Any) -> None + # type: (list, Any) -> None """ Add elements to the unbuffered list, creating new buffers and @@ -570,7 +568,7 @@ def _add_unbuffered(self, elements, offset_fn): self.collapse_if_needed(offset_fn) def _add_unbuffered_weighted(self, elements, offset_fn): - # type: (List, Any) -> None + # type: (list, Any) -> None """ Add elements with weights to the unbuffered list, creating new buffers and @@ -657,7 +655,7 @@ def collapse_if_needed(self, offset_fn): def _collapse(buffers, offset_fn, spec): - # type: (List[_QuantileBuffer], Any, _QuantileSpec) -> _QuantileBuffer + # type: (list[_QuantileBuffer], Any, _QuantileSpec) -> _QuantileBuffer """ Approximates elements from multiple buffers and produces a single buffer. @@ -686,7 +684,7 @@ def _collapse(buffers, offset_fn, spec): def _interpolate(buffers, count, step, offset, spec): - # type: (List[_QuantileBuffer], int, float, float, _QuantileSpec) -> Tuple[List, List, Any, Any] + # type: (list[_QuantileBuffer], int, float, float, _QuantileSpec) -> tuple[list, list, Any, Any] """ Emulates taking the ordered union of all elements in buffers, repeated @@ -936,7 +934,7 @@ def add_input(self, quantile_state, element): return quantile_state def _add_inputs(self, quantile_state, elements): - # type: (_QuantileState, List) -> _QuantileState + # type: (_QuantileState, list) -> _QuantileState """ Add a batch of elements to the collection being summarized by quantile diff --git a/sdks/python/apache_beam/transforms/timestamped_value_type_test.py b/sdks/python/apache_beam/transforms/timestamped_value_type_test.py index 46449bb1ef72..3852b9a85bf1 100644 --- a/sdks/python/apache_beam/transforms/timestamped_value_type_test.py +++ b/sdks/python/apache_beam/transforms/timestamped_value_type_test.py @@ -17,8 +17,6 @@ import unittest from typing import Any -from typing import Dict -from typing import List from typing import TypeVar import apache_beam as beam @@ -28,20 +26,20 @@ T = TypeVar("T") -def ConvertToTimestampedValue(plant: Dict[str, Any]) -> TimestampedValue[str]: +def ConvertToTimestampedValue(plant: dict[str, Any]) -> TimestampedValue[str]: return TimestampedValue[str](plant["name"], plant["season"]) -def ConvertToTimestampedValue_1(plant: Dict[str, Any]) -> TimestampedValue: +def ConvertToTimestampedValue_1(plant: dict[str, Any]) -> TimestampedValue: return TimestampedValue(plant["name"], plant["season"]) def ConvertToTimestampedValue_2( - plant: Dict[str, Any]) -> TimestampedValue[List[str]]: - return TimestampedValue[List[str]](plant["name"], plant["season"]) + plant: dict[str, Any]) -> TimestampedValue[list[str]]: + return TimestampedValue[list[str]](plant["name"], plant["season"]) -def ConvertToTimestampedValue_3(plant: Dict[str, Any]) -> TimestampedValue[T]: +def ConvertToTimestampedValue_3(plant: dict[str, Any]) -> TimestampedValue[T]: return TimestampedValue[T](plant["name"], plant["season"]) diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py index 3b876bf9dbfb..d9c7eb84f9f3 100644 --- a/sdks/python/apache_beam/transforms/userstate.py +++ b/sdks/python/apache_beam/transforms/userstate.py @@ -22,15 +22,12 @@ import collections import types +from collections.abc import Callable +from collections.abc import Iterable from typing import TYPE_CHECKING from typing import Any -from typing import Callable -from typing import Dict -from typing import Iterable from typing import NamedTuple from typing import Optional -from typing import Set -from typing import Tuple from typing import TypeVar from apache_beam.coders import Coder @@ -167,7 +164,7 @@ def to_runner_api( [ ('user_key', Any), ('dynamic_timer_tag', str), - ('windows', Tuple['windowed_value.BoundedWindow', ...]), + ('windows', tuple['windowed_value.BoundedWindow', ...]), ('clear_bit', bool), ('fire_timestamp', Optional['Timestamp']), ('hold_timestamp', Optional['Timestamp']), @@ -228,7 +225,7 @@ def _inner(method: CallableT) -> CallableT: return _inner -def get_dofn_specs(dofn: 'DoFn') -> Tuple[Set[StateSpec], Set[TimerSpec]]: +def get_dofn_specs(dofn: 'DoFn') -> tuple[set[StateSpec], set[TimerSpec]]: """Gets the state and timer specs for a DoFn, if any. Args: @@ -320,7 +317,7 @@ def set(self, timestamp: Timestamp, dynamic_timer_tag: str = '') -> None: class RuntimeTimer(BaseTimer): """Timer interface object passed to user code.""" def __init__(self) -> None: - self._timer_recordings: Dict[str, _TimerTuple] = {} + self._timer_recordings: dict[str, _TimerTuple] = {} self._cleared = False self._new_timestamp: Optional[Timestamp] = None @@ -385,15 +382,15 @@ class CombiningValueRuntimeState(AccumulatingRuntimeState): class OrderedListRuntimeState(AccumulatingRuntimeState): """Ordered list state interface object passed to user code.""" - def read(self) -> Iterable[Tuple[Timestamp, Any]]: + def read(self) -> Iterable[tuple[Timestamp, Any]]: raise NotImplementedError(type(self)) - def add(self, value: Tuple[Timestamp, Any]) -> None: + def add(self, value: tuple[Timestamp, Any]) -> None: raise NotImplementedError(type(self)) def read_range( self, min_time_stamp: Timestamp, - limit_time_stamp: Timestamp) -> Iterable[Tuple[Timestamp, Any]]: + limit_time_stamp: Timestamp) -> Iterable[tuple[Timestamp, Any]]: raise NotImplementedError(type(self)) def clear_range( diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py index 5dd6c61d6add..8f2cb34f982e 100644 --- a/sdks/python/apache_beam/transforms/userstate_test.py +++ b/sdks/python/apache_beam/transforms/userstate_test.py @@ -20,7 +20,6 @@ import unittest from typing import Any -from typing import List import mock import pytest @@ -437,7 +436,7 @@ def __repr__(self): class StatefulDoFnOnDirectRunnerTest(unittest.TestCase): # pylint: disable=expression-not-assigned - all_records: List[Any] + all_records: list[Any] def setUp(self): # Use state on the TestCase class, since other references would be pickled diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a03652de2496..bd22b13a1c08 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -28,12 +28,10 @@ import threading import time import uuid +from collections.abc import Callable +from collections.abc import Iterable from typing import TYPE_CHECKING from typing import Any -from typing import Callable -from typing import Iterable -from typing import List -from typing import Tuple from typing import TypeVar from typing import Union @@ -265,7 +263,7 @@ def collect_values(key, tagged_values): @ptransform_fn -@typehints.with_input_types(Tuple[K, V]) +@typehints.with_input_types(tuple[K, V]) @typehints.with_output_types(K) def Keys(pcoll, label='Keys'): # pylint: disable=invalid-name """Produces a PCollection of first elements of 2-tuples in a PCollection.""" @@ -273,7 +271,7 @@ def Keys(pcoll, label='Keys'): # pylint: disable=invalid-name @ptransform_fn -@typehints.with_input_types(Tuple[K, V]) +@typehints.with_input_types(tuple[K, V]) @typehints.with_output_types(V) def Values(pcoll, label='Values'): # pylint: disable=invalid-name """Produces a PCollection of second elements of 2-tuples in a PCollection.""" @@ -281,8 +279,8 @@ def Values(pcoll, label='Values'): # pylint: disable=invalid-name @ptransform_fn -@typehints.with_input_types(Tuple[K, V]) -@typehints.with_output_types(Tuple[V, K]) +@typehints.with_input_types(tuple[K, V]) +@typehints.with_output_types(tuple[V, K]) def KvSwap(pcoll, label='KvSwap'): # pylint: disable=invalid-name """Produces a PCollection reversing 2-tuples in a PCollection.""" return pcoll | label >> MapTuple(lambda k, v: (v, k)) @@ -784,7 +782,7 @@ def process(self, element): @typehints.with_input_types(T) -@typehints.with_output_types(List[T]) +@typehints.with_output_types(list[T]) class BatchElements(PTransform): """A Transform that batches elements for amortized processing. @@ -795,7 +793,7 @@ class BatchElements(PTransform): where the per element cost is (often significantly) smaller than the fixed cost and could be amortized over multiple elements. It consumes a PCollection - of element type T and produces a PCollection of element type List[T]. + of element type T and produces a PCollection of element type list[T]. This transform attempts to find the best batch size between the minimim and maximum parameters by profiling the time taken by (fused) downstream @@ -924,8 +922,8 @@ def get_window_coder(self): return self._window_coder -@typehints.with_input_types(Tuple[K, V]) -@typehints.with_output_types(Tuple[K, V]) +@typehints.with_input_types(tuple[K, V]) +@typehints.with_output_types(tuple[K, V]) class ReshufflePerKey(PTransform): """PTransform that returns a PCollection equivalent to its input, but operationally provides some of the side effects of a GroupByKey, @@ -1017,13 +1015,13 @@ def expand(self, pcoll): return ( pcoll | 'AddRandomKeys' >> Map(lambda t: (random.randrange(0, self.num_buckets), t) - ).with_input_types(T).with_output_types(Tuple[int, T]) + ).with_input_types(T).with_output_types(tuple[int, T]) | ReshufflePerKey() | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( - Tuple[int, T]).with_output_types(T)) + tuple[int, T]).with_output_types(T)) def to_runner_api_parameter(self, unused_context): - # type: (PipelineContext) -> Tuple[str, None] + # type: (PipelineContext) -> tuple[str, None] return common_urns.composites.RESHUFFLE.urn, None @staticmethod @@ -1072,8 +1070,8 @@ def WithKeys(pcoll, k, *args, **kwargs): return pcoll | Map(lambda v: (k, v)) -@typehints.with_input_types(Tuple[K, V]) -@typehints.with_output_types(Tuple[K, Iterable[V]]) +@typehints.with_input_types(tuple[K, V]) +@typehints.with_output_types(tuple[K, Iterable[V]]) class GroupIntoBatches(PTransform): """PTransform that batches the input into desired batch size. Elements are buffered until they are equal to batch size provided in the argument at which @@ -1110,7 +1108,7 @@ def expand(self, pcoll): def to_runner_api_parameter( self, unused_context # type: PipelineContext - ): # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] + ): # type: (...) -> tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] return ( common_urns.group_into_batches_components.GROUP_INTO_BATCHES.urn, self.params.get_payload()) @@ -1122,7 +1120,7 @@ def to_runner_api_parameter( def from_runner_api_parameter(unused_ptransform, proto, unused_context): return GroupIntoBatches(*_GroupIntoBatchesParams.parse_payload(proto)) - @typehints.with_input_types(Tuple[K, V]) + @typehints.with_input_types(tuple[K, V]) @typehints.with_output_types( typehints.Tuple[ ShardedKeyType[typehints.TypeVariable(K)], # type: ignore[misc] @@ -1170,7 +1168,7 @@ def expand(self, pcoll): def to_runner_api_parameter( self, unused_context # type: PipelineContext - ): # type: (...) -> Tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] + ): # type: (...) -> tuple[str, beam_runner_api_pb2.GroupIntoBatchesPayload] return ( common_urns.composites.GROUP_INTO_BATCHES_WITH_SHARDED_KEY.urn, self.params.get_payload()) @@ -1417,8 +1415,8 @@ def add_window_info( def expand(self, pcoll): return pcoll | ParDo(self.add_window_info) - @typehints.with_input_types(Tuple[K, V]) - @typehints.with_output_types(Tuple[K, V]) + @typehints.with_input_types(tuple[K, V]) + @typehints.with_output_types(tuple[K, V]) class TimestampInValue(PTransform): """PTransform to wrap the Value in a KV pair in a TimestampedValue with the element's associated timestamp.""" @@ -1430,8 +1428,8 @@ def add_timestamp_info(element, timestamp=DoFn.TimestampParam): def expand(self, pcoll): return pcoll | ParDo(self.add_timestamp_info) - @typehints.with_input_types(Tuple[K, V]) - @typehints.with_output_types(Tuple[K, V]) + @typehints.with_input_types(tuple[K, V]) + @typehints.with_output_types(tuple[K, V]) class WindowInValue(PTransform): """PTransform to convert the Value in a KV pair into a tuple of (value, timestamp, window), with the whole element being wrapped inside a @@ -1490,7 +1488,7 @@ def _process(element): @staticmethod @typehints.with_input_types(str) - @typehints.with_output_types(List[str]) + @typehints.with_output_types(list[str]) @ptransform_fn def all_matches(pcoll, regex): """ @@ -1511,7 +1509,7 @@ def _process(element): @staticmethod @typehints.with_input_types(str) - @typehints.with_output_types(Tuple[str, str]) + @typehints.with_output_types(tuple[str, str]) @ptransform_fn def matches_kv(pcoll, regex, keyGroup, valueGroup=0): """ @@ -1558,7 +1556,7 @@ def _process(element): @staticmethod @typehints.with_input_types(str) - @typehints.with_output_types(Union[List[str], List[Tuple[str, str]]]) + @typehints.with_output_types(Union[list[str], list[tuple[str, str]]]) @ptransform_fn def find_all(pcoll, regex, group=0, outputEmpty=True): """ @@ -1587,7 +1585,7 @@ def _process(element): @staticmethod @typehints.with_input_types(str) - @typehints.with_output_types(Tuple[str, str]) + @typehints.with_output_types(tuple[str, str]) @ptransform_fn def find_kv(pcoll, regex, keyGroup, valueGroup=0): """ @@ -1644,7 +1642,7 @@ def replace_first(pcoll, regex, replacement): @staticmethod @typehints.with_input_types(str) - @typehints.with_output_types(List[str]) + @typehints.with_output_types(list[str]) @ptransform_fn def split(pcoll, regex, outputEmpty=False): """ diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d86509c7dde3..6ffd5fa46795 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -28,8 +28,8 @@ import time import unittest import warnings +from collections.abc import Mapping from datetime import datetime -from typing import Mapping import pytest import pytz diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py index fc20174ca1e2..900d2b600353 100644 --- a/sdks/python/apache_beam/transforms/window.py +++ b/sdks/python/apache_beam/transforms/window.py @@ -50,11 +50,10 @@ # pytype: skip-file import abc +from collections.abc import Iterable from functools import total_ordering from typing import Any from typing import Generic -from typing import Iterable -from typing import List from typing import Optional from typing import TypeVar @@ -367,7 +366,7 @@ def windowed_value_at_end_of_window(cls, value): return cls.windowed_value(value, GlobalWindow().max_timestamp()) def assign(self, - assign_context: WindowFn.AssignContext) -> List[GlobalWindow]: + assign_context: WindowFn.AssignContext) -> list[GlobalWindow]: return [GlobalWindow()] def get_window_coder(self) -> coders.GlobalWindowCoder: @@ -419,7 +418,7 @@ def __init__(self, size: DurationTypes, offset: TimestampTypes = 0): self.size = Duration.of(size) self.offset = Timestamp.of(offset) % self.size - def assign(self, context: WindowFn.AssignContext) -> List[IntervalWindow]: + def assign(self, context: WindowFn.AssignContext) -> list[IntervalWindow]: timestamp = context.timestamp start = timestamp - (timestamp - self.offset) % self.size return [IntervalWindow(start, start + self.size)] @@ -479,7 +478,7 @@ def __init__( self.period = Duration.of(period) self.offset = Timestamp.of(offset) % period - def assign(self, context: WindowFn.AssignContext) -> List[IntervalWindow]: + def assign(self, context: WindowFn.AssignContext) -> list[IntervalWindow]: timestamp = context.timestamp start = timestamp - ((timestamp - self.offset) % self.period) return [ @@ -541,7 +540,7 @@ def __init__(self, gap_size: DurationTypes) -> None: raise ValueError('The size parameter must be strictly positive.') self.gap_size = Duration.of(gap_size) - def assign(self, context: WindowFn.AssignContext) -> List[IntervalWindow]: + def assign(self, context: WindowFn.AssignContext) -> list[IntervalWindow]: timestamp = context.timestamp return [IntervalWindow(timestamp, timestamp + self.gap_size)] @@ -549,7 +548,7 @@ def get_window_coder(self) -> coders.IntervalWindowCoder: return coders.IntervalWindowCoder() def merge(self, merge_context: WindowFn.MergeContext) -> None: - to_merge: List[BoundedWindow] = [] + to_merge: list[BoundedWindow] = [] end = MIN_TIMESTAMP for w in sorted(merge_context.windows, key=lambda w: w.start): if to_merge: From da0324c6967e69adc79f0e0b5205a206aa19dd1e Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Mon, 11 Nov 2024 13:02:36 -0500 Subject: [PATCH 2/2] formatting --- sdks/python/apache_beam/transforms/stats.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/stats.py b/sdks/python/apache_beam/transforms/stats.py index 28d42245d8bd..65adaee83c7a 100644 --- a/sdks/python/apache_beam/transforms/stats.py +++ b/sdks/python/apache_beam/transforms/stats.py @@ -372,8 +372,7 @@ def display_data(self): input_batched=self._input_batched) @typehints.with_input_types( - typehints.Union[tuple[K, V], - tuple[K, tuple[V, float]]]) + typehints.Union[tuple[K, V], tuple[K, tuple[V, float]]]) @typehints.with_output_types(tuple[K, list[V]]) class PerKey(PTransform): """