From 63fc0db0b5c10a1e6170a30166dd271dc93aafd2 Mon Sep 17 00:00:00 2001 From: Jack McCluskey Date: Wed, 13 Nov 2024 10:18:44 -0500 Subject: [PATCH] Update dataframes to PEP 585 typing --- sdks/python/apache_beam/dataframe/convert.py | 8 ++--- sdks/python/apache_beam/dataframe/doctests.py | 12 +++---- .../apache_beam/dataframe/expressions.py | 33 +++++++++---------- .../apache_beam/dataframe/frame_base.py | 8 ++--- sdks/python/apache_beam/dataframe/frames.py | 7 ++-- .../apache_beam/dataframe/frames_test.py | 3 +- .../dataframe/pandas_top_level_functions.py | 2 +- .../apache_beam/dataframe/partitionings.py | 5 ++- sdks/python/apache_beam/dataframe/schemas.py | 6 ++-- .../apache_beam/dataframe/transforms.py | 23 +++++-------- 10 files changed, 44 insertions(+), 63 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/convert.py b/sdks/python/apache_beam/dataframe/convert.py index e44cc429eac1..c5a0d1025c6d 100644 --- a/sdks/python/apache_beam/dataframe/convert.py +++ b/sdks/python/apache_beam/dataframe/convert.py @@ -17,11 +17,9 @@ import inspect import warnings import weakref +from collections.abc import Iterable from typing import Any -from typing import Dict -from typing import Iterable from typing import Optional -from typing import Tuple from typing import Union import pandas as pd @@ -172,7 +170,7 @@ def to_pcollection( always_return_tuple=False, yield_elements='schemas', include_indexes=False, - pipeline=None) -> Union[pvalue.PCollection, Tuple[pvalue.PCollection, ...]]: + pipeline=None) -> Union[pvalue.PCollection, tuple[pvalue.PCollection, ...]]: """Converts one or more deferred dataframe-like objects back to a PCollection. This method creates and applies the actual Beam operations that compute @@ -252,7 +250,7 @@ def extract_input(placeholder): df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE ] if len(new_dataframes): - new_results: Dict[Any, pvalue.PCollection] = { + new_results: dict[Any, pvalue.PCollection] = { p: extract_input(p) for p in placeholders } | label >> transforms._DataframeExpressionsTransform( diff --git a/sdks/python/apache_beam/dataframe/doctests.py b/sdks/python/apache_beam/dataframe/doctests.py index 33faa6b58599..c57d0b0e699e 100644 --- a/sdks/python/apache_beam/dataframe/doctests.py +++ b/sdks/python/apache_beam/dataframe/doctests.py @@ -45,8 +45,6 @@ import traceback from io import StringIO from typing import Any -from typing import Dict -from typing import List import numpy as np import pandas as pd @@ -146,7 +144,7 @@ class _InMemoryResultRecorder(object): """ # Class-level value to survive pickling. - _ALL_RESULTS = {} # type: Dict[str, List[Any]] + _ALL_RESULTS = {} # type: dict[str, list[Any]] def __init__(self): self._id = id(self) @@ -729,15 +727,15 @@ def wrapper(fn): Args: optionflags (int): Passed through to doctests. - extraglobs (Dict[str,Any]): Passed through to doctests. + extraglobs (dict[str,Any]): Passed through to doctests. use_beam (bool): If true, run a Beam pipeline with partitioned input to verify the examples, else use PartitioningSession to simulate distributed execution. - skip (Dict[str,str]): A set of examples to skip entirely. + skip (dict[str,str]): A set of examples to skip entirely. If a key is '*', an example will be skipped in all test scenarios. - wont_implement_ok (Dict[str,str]): A set of examples that are allowed to + wont_implement_ok (dict[str,str]): A set of examples that are allowed to raise WontImplementError. - not_implemented_ok (Dict[str,str]): A set of examples that are allowed to + not_implemented_ok (dict[str,str]): A set of examples that are allowed to raise NotImplementedError. Returns: diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py index 91d237c7de96..af04e06bdf6b 100644 --- a/sdks/python/apache_beam/dataframe/expressions.py +++ b/sdks/python/apache_beam/dataframe/expressions.py @@ -17,10 +17,10 @@ import contextlib import random import threading +from collections.abc import Callable +from collections.abc import Iterable from typing import Any -from typing import Callable from typing import Generic -from typing import Iterable from typing import Optional from typing import TypeVar @@ -251,9 +251,9 @@ def preserves_partition_by(self) -> partitionings.Partitioning: class PlaceholderExpression(Expression): """An expression whose value must be explicitly bound in the session.""" def __init__( - self, # type: PlaceholderExpression - proxy, # type: T - reference=None, # type: Any + self, + proxy: T, + reference: Any = None, ): """Initialize a placeholder expression. @@ -282,11 +282,7 @@ def preserves_partition_by(self): class ConstantExpression(Expression): """An expression whose value is known at pipeline construction time.""" - def __init__( - self, # type: ConstantExpression - value, # type: T - proxy=None # type: Optional[T] - ): + def __init__(self, value: T, proxy: Optional[T] = None): """Initialize a constant expression. Args: @@ -319,14 +315,15 @@ def preserves_partition_by(self): class ComputedExpression(Expression): """An expression whose value must be computed at pipeline execution time.""" def __init__( - self, # type: ComputedExpression - name, # type: str - func, # type: Callable[...,T] - args, # type: Iterable[Expression] - proxy=None, # type: Optional[T] - _id=None, # type: Optional[str] - requires_partition_by=partitionings.Index(), # type: partitionings.Partitioning - preserves_partition_by=partitionings.Singleton(), # type: partitionings.Partitioning + self, + name: str, + func: Callable[..., T], + args: Iterable[Expression], + proxy: Optional[T] = None, + _id: Optional[str] = None, + requires_partition_by: partitionings.Partitioning = partitionings.Index(), + preserves_partition_by: partitionings.Partitioning = partitionings. + Singleton(), ): """Initialize a computed expression. diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py index 3b9755232e80..8e206fc5e037 100644 --- a/sdks/python/apache_beam/dataframe/frame_base.py +++ b/sdks/python/apache_beam/dataframe/frame_base.py @@ -17,15 +17,13 @@ import functools import operator import re +from collections.abc import Callable from inspect import cleandoc from inspect import getfullargspec from inspect import isclass from inspect import ismodule from inspect import unwrap from typing import Any -from typing import Callable -from typing import Dict -from typing import List from typing import Optional from typing import Tuple from typing import Union @@ -38,7 +36,7 @@ class DeferredBase(object): - _pandas_type_map: Dict[Union[type, None], type] = {} + _pandas_type_map: dict[Union[type, None], type] = {} def __init__(self, expr): self._expr = expr @@ -229,7 +227,7 @@ def _elementwise_function( def _proxy_function( func: Union[Callable, str], name: Optional[str] = None, - restrictions: Optional[Dict[str, Union[Any, List[Any]]]] = None, + restrictions: Optional[dict[str, Union[Any, list[Any]]]] = None, inplace: bool = False, base: Optional[type] = None, *, diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py index 421430ec972c..ccd01f35f87b 100644 --- a/sdks/python/apache_beam/dataframe/frames.py +++ b/sdks/python/apache_beam/dataframe/frames.py @@ -38,7 +38,6 @@ import math import re import warnings -from typing import List from typing import Optional import numpy as np @@ -2660,7 +2659,7 @@ def get(self, key, default_value=None): @frame_base.populate_defaults(pd.DataFrame) @frame_base.maybe_inplace def set_index(self, keys, **kwargs): - """``keys`` must be a ``str`` or ``List[str]``. Passing an Index or Series + """``keys`` must be a ``str`` or ``list[str]``. Passing an Index or Series is not yet supported (`Issue 20759 `_).""" if isinstance(keys, str): @@ -4574,7 +4573,7 @@ def value_counts(self, **kwargs): tshift = frame_base.wont_implement_method( DataFrameGroupBy, 'tshift', reason="deprecated") -def _maybe_project_func(projection: Optional[List[str]]): +def _maybe_project_func(projection: Optional[list[str]]): """ Returns identity func if projection is empty or None, else returns a function that projects the specified columns. """ if projection: @@ -4967,7 +4966,7 @@ def func(*args): else: raise frame_base.WontImplementError( - "others must be None, DeferredSeries, or List[DeferredSeries] " + "others must be None, DeferredSeries, or list[DeferredSeries] " f"(encountered {type(others)}). Other types are not supported " "because they make this operation sensitive to the order of the " "data.", reason="order-sensitive") diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 55d9fc5f4dfb..f99b77e446a8 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -18,7 +18,6 @@ import sys import unittest import warnings -from typing import Dict import numpy as np import pandas as pd @@ -1707,7 +1706,7 @@ def test_pivot_no_index_provided_on_multiindex(self): 'describe')) -def numeric_only_kwargs_for_pandas_2(agg_type: str) -> Dict[str, bool]: +def numeric_only_kwargs_for_pandas_2(agg_type: str) -> dict[str, bool]: """Get proper arguments for numeric_only. Behavior for numeric_only in these methods changed in Pandas 2 to default diff --git a/sdks/python/apache_beam/dataframe/pandas_top_level_functions.py b/sdks/python/apache_beam/dataframe/pandas_top_level_functions.py index ce36dbeb09ad..a8139675ad39 100644 --- a/sdks/python/apache_beam/dataframe/pandas_top_level_functions.py +++ b/sdks/python/apache_beam/dataframe/pandas_top_level_functions.py @@ -18,7 +18,7 @@ """ import re -from typing import Mapping +from collections.abc import Mapping import pandas as pd diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py index 0ff09e111480..1fe760fe8589 100644 --- a/sdks/python/apache_beam/dataframe/partitionings.py +++ b/sdks/python/apache_beam/dataframe/partitionings.py @@ -15,9 +15,8 @@ # limitations under the License. import random +from collections.abc import Iterable from typing import Any -from typing import Iterable -from typing import Tuple from typing import TypeVar import numpy as np @@ -47,7 +46,7 @@ def __le__(self, other): return not self.is_subpartitioning_of(other) def partition_fn(self, df: Frame, - num_partitions: int) -> Iterable[Tuple[Any, Frame]]: + num_partitions: int) -> Iterable[tuple[Any, Frame]]: """A callable that actually performs the partitioning of a Frame df. This will be invoked via a FlatMap in conjunction with a GroupKey to diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py index e70229f21f77..f849ab11e77c 100644 --- a/sdks/python/apache_beam/dataframe/schemas.py +++ b/sdks/python/apache_beam/dataframe/schemas.py @@ -24,12 +24,10 @@ # pytype: skip-file import warnings +from collections.abc import Sequence from typing import Any -from typing import Dict from typing import NamedTuple from typing import Optional -from typing import Sequence -from typing import Tuple from typing import TypeVar from typing import Union @@ -170,7 +168,7 @@ def element_typehint_from_dataframe_proxy( fields = [(column, dtype_to_fieldtype(dtype)) for (column, dtype) in output_columns] - field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] + field_options: Optional[dict[str, Sequence[tuple[str, Any]]]] if include_indexes: field_options = { index_name: [(INDEX_OPTION_NAME, None)] diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index d0b5be4eb2a9..c8ac8174232d 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -16,12 +16,9 @@ import collections import logging +from collections.abc import Mapping from typing import TYPE_CHECKING from typing import Any -from typing import Dict -from typing import List -from typing import Mapping -from typing import Tuple from typing import TypeVar from typing import Union @@ -108,7 +105,7 @@ def expand(self, input_pcolls): from apache_beam.dataframe import convert # Convert inputs to a flat dict. - input_dict = _flatten(input_pcolls) # type: Dict[Any, PCollection] + input_dict = _flatten(input_pcolls) # type: dict[Any, PCollection] proxies = _flatten(self._proxy) if self._proxy is not None else { tag: None for tag in input_dict @@ -116,7 +113,7 @@ def expand(self, input_pcolls): input_frames = { k: convert.to_dataframe(pc, proxies[k]) for k, pc in input_dict.items() - } # type: Dict[Any, DeferredFrame] # noqa: F821 + } # type: dict[Any, DeferredFrame] # noqa: F821 # Apply the function. frames_input = _substitute(input_pcolls, input_frames) @@ -152,9 +149,9 @@ def expand(self, inputs): def _apply_deferred_ops( self, - inputs, # type: Dict[expressions.Expression, PCollection] - outputs, # type: Dict[Any, expressions.Expression] - ): # -> Dict[Any, PCollection] + inputs: dict[expressions.Expression, PCollection], + outputs: dict[Any, expressions.Expression], + ) -> dict[Any, PCollection]: """Construct a Beam graph that evaluates a set of expressions on a set of input PCollections. @@ -585,11 +582,9 @@ def _concat(parts): def _flatten( - valueish, # type: Union[T, List[T], Tuple[T], Dict[Any, T]] - root=(), # type: Tuple[Any, ...] - ): - # type: (...) -> Mapping[Tuple[Any, ...], T] - + valueish: Union[T, list[T], tuple[T], dict[Any, T]], + root: tuple[Any, ...] = (), +) -> Mapping[tuple[Any, ...], T]: """Given a nested structure of dicts, tuples, and lists, return a flat dictionary where the values are the leafs and the keys are the "paths" to these leaves.