Skip to content

Commit

Permalink
Merge pull request #33098 from jrmccluskey/dataframesUpdate
Browse files Browse the repository at this point in the history
Update dataframes to PEP 585 typing
  • Loading branch information
liferoad authored Nov 13, 2024
2 parents 3c664e9 + c643645 commit 965c3c6
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 68 deletions.
8 changes: 3 additions & 5 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
import inspect
import warnings
import weakref
from collections.abc import Iterable
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Union

import pandas as pd
Expand Down Expand Up @@ -172,7 +170,7 @@ def to_pcollection(
always_return_tuple=False,
yield_elements='schemas',
include_indexes=False,
pipeline=None) -> Union[pvalue.PCollection, Tuple[pvalue.PCollection, ...]]:
pipeline=None) -> Union[pvalue.PCollection, tuple[pvalue.PCollection, ...]]:
"""Converts one or more deferred dataframe-like objects back to a PCollection.
This method creates and applies the actual Beam operations that compute
Expand Down Expand Up @@ -252,7 +250,7 @@ def extract_input(placeholder):
df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
]
if len(new_dataframes):
new_results: Dict[Any, pvalue.PCollection] = {
new_results: dict[Any, pvalue.PCollection] = {
p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform(
Expand Down
12 changes: 5 additions & 7 deletions sdks/python/apache_beam/dataframe/doctests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import traceback
from io import StringIO
from typing import Any
from typing import Dict
from typing import List

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -146,7 +144,7 @@ class _InMemoryResultRecorder(object):
"""

# Class-level value to survive pickling.
_ALL_RESULTS = {} # type: Dict[str, List[Any]]
_ALL_RESULTS = {} # type: dict[str, list[Any]]

def __init__(self):
self._id = id(self)
Expand Down Expand Up @@ -729,15 +727,15 @@ def wrapper(fn):
Args:
optionflags (int): Passed through to doctests.
extraglobs (Dict[str,Any]): Passed through to doctests.
extraglobs (dict[str,Any]): Passed through to doctests.
use_beam (bool): If true, run a Beam pipeline with partitioned input to
verify the examples, else use PartitioningSession to simulate
distributed execution.
skip (Dict[str,str]): A set of examples to skip entirely.
skip (dict[str,str]): A set of examples to skip entirely.
If a key is '*', an example will be skipped in all test scenarios.
wont_implement_ok (Dict[str,str]): A set of examples that are allowed to
wont_implement_ok (dict[str,str]): A set of examples that are allowed to
raise WontImplementError.
not_implemented_ok (Dict[str,str]): A set of examples that are allowed to
not_implemented_ok (dict[str,str]): A set of examples that are allowed to
raise NotImplementedError.
Returns:
Expand Down
33 changes: 15 additions & 18 deletions sdks/python/apache_beam/dataframe/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import contextlib
import random
import threading
from collections.abc import Callable
from collections.abc import Iterable
from typing import Any
from typing import Callable
from typing import Generic
from typing import Iterable
from typing import Optional
from typing import TypeVar

Expand Down Expand Up @@ -251,9 +251,9 @@ def preserves_partition_by(self) -> partitionings.Partitioning:
class PlaceholderExpression(Expression):
"""An expression whose value must be explicitly bound in the session."""
def __init__(
self, # type: PlaceholderExpression
proxy, # type: T
reference=None, # type: Any
self,
proxy: T,
reference: Any = None,
):
"""Initialize a placeholder expression.
Expand Down Expand Up @@ -282,11 +282,7 @@ def preserves_partition_by(self):

class ConstantExpression(Expression):
"""An expression whose value is known at pipeline construction time."""
def __init__(
self, # type: ConstantExpression
value, # type: T
proxy=None # type: Optional[T]
):
def __init__(self, value: T, proxy: Optional[T] = None):
"""Initialize a constant expression.
Args:
Expand Down Expand Up @@ -319,14 +315,15 @@ def preserves_partition_by(self):
class ComputedExpression(Expression):
"""An expression whose value must be computed at pipeline execution time."""
def __init__(
self, # type: ComputedExpression
name, # type: str
func, # type: Callable[...,T]
args, # type: Iterable[Expression]
proxy=None, # type: Optional[T]
_id=None, # type: Optional[str]
requires_partition_by=partitionings.Index(), # type: partitionings.Partitioning
preserves_partition_by=partitionings.Singleton(), # type: partitionings.Partitioning
self,
name: str,
func: Callable[..., T],
args: Iterable[Expression],
proxy: Optional[T] = None,
_id: Optional[str] = None,
requires_partition_by: partitionings.Partitioning = partitionings.Index(),
preserves_partition_by: partitionings.Partitioning = partitionings.
Singleton(),
):
"""Initialize a computed expression.
Expand Down
8 changes: 3 additions & 5 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
import functools
import operator
import re
from collections.abc import Callable
from inspect import cleandoc
from inspect import getfullargspec
from inspect import isclass
from inspect import ismodule
from inspect import unwrap
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
Expand All @@ -38,7 +36,7 @@

class DeferredBase(object):

_pandas_type_map: Dict[Union[type, None], type] = {}
_pandas_type_map: dict[Union[type, None], type] = {}

def __init__(self, expr):
self._expr = expr
Expand Down Expand Up @@ -229,7 +227,7 @@ def _elementwise_function(
def _proxy_function(
func: Union[Callable, str],
name: Optional[str] = None,
restrictions: Optional[Dict[str, Union[Any, List[Any]]]] = None,
restrictions: Optional[dict[str, Union[Any, list[Any]]]] = None,
inplace: bool = False,
base: Optional[type] = None,
*,
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/dataframe/frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import math
import re
import warnings
from typing import List
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -2660,7 +2659,7 @@ def get(self, key, default_value=None):
@frame_base.populate_defaults(pd.DataFrame)
@frame_base.maybe_inplace
def set_index(self, keys, **kwargs):
"""``keys`` must be a ``str`` or ``List[str]``. Passing an Index or Series
"""``keys`` must be a ``str`` or ``list[str]``. Passing an Index or Series
is not yet supported (`Issue 20759
<https://github.com/apache/beam/issues/20759>`_)."""
if isinstance(keys, str):
Expand Down Expand Up @@ -4574,7 +4573,7 @@ def value_counts(self, **kwargs):
tshift = frame_base.wont_implement_method(
DataFrameGroupBy, 'tshift', reason="deprecated")

def _maybe_project_func(projection: Optional[List[str]]):
def _maybe_project_func(projection: Optional[list[str]]):
""" Returns identity func if projection is empty or None, else returns
a function that projects the specified columns. """
if projection:
Expand Down Expand Up @@ -4967,7 +4966,7 @@ def func(*args):

else:
raise frame_base.WontImplementError(
"others must be None, DeferredSeries, or List[DeferredSeries] "
"others must be None, DeferredSeries, or list[DeferredSeries] "
f"(encountered {type(others)}). Other types are not supported "
"because they make this operation sensitive to the order of the "
"data.", reason="order-sensitive")
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import sys
import unittest
import warnings
from typing import Dict

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -1707,7 +1706,7 @@ def test_pivot_no_index_provided_on_multiindex(self):
'describe'))


def numeric_only_kwargs_for_pandas_2(agg_type: str) -> Dict[str, bool]:
def numeric_only_kwargs_for_pandas_2(agg_type: str) -> dict[str, bool]:
"""Get proper arguments for numeric_only.
Behavior for numeric_only in these methods changed in Pandas 2 to default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""

import re
from typing import Mapping
from collections.abc import Mapping

import pandas as pd

Expand Down
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/dataframe/partitionings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
# limitations under the License.

import random
from collections.abc import Iterable
from typing import Any
from typing import Iterable
from typing import Tuple
from typing import TypeVar

import numpy as np
Expand Down Expand Up @@ -47,7 +46,7 @@ def __le__(self, other):
return not self.is_subpartitioning_of(other)

def partition_fn(self, df: Frame,
num_partitions: int) -> Iterable[Tuple[Any, Frame]]:
num_partitions: int) -> Iterable[tuple[Any, Frame]]:
"""A callable that actually performs the partitioning of a Frame df.
This will be invoked via a FlatMap in conjunction with a GroupKey to
Expand Down
6 changes: 2 additions & 4 deletions sdks/python/apache_beam/dataframe/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@
# pytype: skip-file

import warnings
from collections.abc import Sequence
from typing import Any
from typing import Dict
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import Union

Expand Down Expand Up @@ -170,7 +168,7 @@ def element_typehint_from_dataframe_proxy(

fields = [(column, dtype_to_fieldtype(dtype))
for (column, dtype) in output_columns]
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]]
field_options: Optional[dict[str, Sequence[tuple[str, Any]]]]
if include_indexes:
field_options = {
index_name: [(INDEX_OPTION_NAME, None)]
Expand Down
29 changes: 10 additions & 19 deletions sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

import collections
import logging
from typing import TYPE_CHECKING
from collections.abc import Mapping
from typing import Any
from typing import Dict
from typing import List
from typing import Mapping
from typing import Tuple
from typing import TypeVar
from typing import Union

Expand All @@ -32,16 +28,13 @@
from apache_beam.dataframe import expressions
from apache_beam.dataframe import frames # pylint: disable=unused-import
from apache_beam.dataframe import partitionings
from apache_beam.pvalue import PCollection
from apache_beam.utils import windowed_value

__all__ = [
'DataframeTransform',
]

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from apache_beam.pvalue import PCollection

T = TypeVar('T')

TARGET_PARTITION_SIZE = 1 << 23 # 8M
Expand Down Expand Up @@ -108,15 +101,15 @@ def expand(self, input_pcolls):
from apache_beam.dataframe import convert

# Convert inputs to a flat dict.
input_dict = _flatten(input_pcolls) # type: Dict[Any, PCollection]
input_dict = _flatten(input_pcolls) # type: dict[Any, PCollection]
proxies = _flatten(self._proxy) if self._proxy is not None else {
tag: None
for tag in input_dict
}
input_frames = {
k: convert.to_dataframe(pc, proxies[k])
for k, pc in input_dict.items()
} # type: Dict[Any, DeferredFrame] # noqa: F821
} # type: dict[Any, DeferredFrame] # noqa: F821

# Apply the function.
frames_input = _substitute(input_pcolls, input_frames)
Expand Down Expand Up @@ -152,9 +145,9 @@ def expand(self, inputs):

def _apply_deferred_ops(
self,
inputs, # type: Dict[expressions.Expression, PCollection]
outputs, # type: Dict[Any, expressions.Expression]
): # -> Dict[Any, PCollection]
inputs: dict[expressions.Expression, PCollection],
outputs: dict[Any, expressions.Expression],
) -> dict[Any, PCollection]:
"""Construct a Beam graph that evaluates a set of expressions on a set of
input PCollections.
Expand Down Expand Up @@ -585,11 +578,9 @@ def _concat(parts):


def _flatten(
valueish, # type: Union[T, List[T], Tuple[T], Dict[Any, T]]
root=(), # type: Tuple[Any, ...]
):
# type: (...) -> Mapping[Tuple[Any, ...], T]

valueish: Union[T, list[T], tuple[T], dict[Any, T]],
root: tuple[Any, ...] = (),
) -> Mapping[tuple[Any, ...], T]:
"""Given a nested structure of dicts, tuples, and lists, return a flat
dictionary where the values are the leafs and the keys are the "paths" to
these leaves.
Expand Down

0 comments on commit 965c3c6

Please sign in to comment.