Skip to content

Commit

Permalink
Merge pull request #31755 Modernize type hints.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jul 10, 2024
2 parents 080c80a + a0ba8de commit 3696140
Show file tree
Hide file tree
Showing 106 changed files with 1,696 additions and 2,328 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/observable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class ObservableMixinTest(unittest.TestCase):
observed_count = 0
observed_sum = 0
observed_keys = [] # type: List[Optional[str]]
observed_keys: List[Optional[str]] = []

def observer(self, value, key=None):
self.observed_count += 1
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def from_type_hint(cls, type_hint, registry):
return cls(schema)

@staticmethod
def from_payload(payload):
# type: (bytes) -> RowCoder
def from_payload(payload: bytes) -> 'RowCoder':
return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema))

def __reduce__(self):
Expand Down
32 changes: 11 additions & 21 deletions sdks/python/apache_beam/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ class OutputStream(object):
A pure Python implementation of stream.OutputStream."""
def __init__(self):
self.data = [] # type: List[bytes]
self.data: List[bytes] = []
self.byte_count = 0

def write(self, b, nested=False):
# type: (bytes, bool) -> None
def write(self, b: bytes, nested: bool = False) -> None:
assert isinstance(b, bytes)
if nested:
self.write_var_int64(len(b))
Expand All @@ -45,8 +44,7 @@ def write_byte(self, val):
self.data.append(chr(val).encode('latin-1'))
self.byte_count += 1

def write_var_int64(self, v):
# type: (int) -> None
def write_var_int64(self, v: int) -> None:
if v < 0:
v += 1 << 64
if v <= 0:
Expand Down Expand Up @@ -78,16 +76,13 @@ def write_bigendian_double(self, v):
def write_bigendian_float(self, v):
self.write(struct.pack('>f', v))

def get(self):
# type: () -> bytes
def get(self) -> bytes:
return b''.join(self.data)

def size(self):
# type: () -> int
def size(self) -> int:
return self.byte_count

def _clear(self):
# type: () -> None
def _clear(self) -> None:
self.data = []
self.byte_count = 0

Expand All @@ -101,8 +96,7 @@ def __init__(self):
super().__init__()
self.count = 0

def write(self, byte_array, nested=False):
# type: (bytes, bool) -> None
def write(self, byte_array: bytes, nested: bool = False) -> None:
blen = len(byte_array)
if nested:
self.write_var_int64(blen)
Expand All @@ -125,25 +119,21 @@ class InputStream(object):
"""For internal use only; no backwards-compatibility guarantees.
A pure Python implementation of stream.InputStream."""
def __init__(self, data):
# type: (bytes) -> None
def __init__(self, data: bytes) -> None:
self.data = data
self.pos = 0

def size(self):
return len(self.data) - self.pos

def read(self, size):
# type: (int) -> bytes
def read(self, size: int) -> bytes:
self.pos += size
return self.data[self.pos - size:self.pos]

def read_all(self, nested):
# type: (bool) -> bytes
def read_all(self, nested: bool) -> bytes:
return self.read(self.read_var_int64() if nested else self.size())

def read_byte(self):
# type: () -> int
def read_byte(self) -> int:
self.pos += 1
return self.data[self.pos - 1]

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/standard_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def json_value_parser(self, coder_spec):
# Used when --fix is passed.

fix = False
to_fix = {} # type: Dict[Tuple[int, bytes], bytes]
to_fix: Dict[Tuple[int, bytes], bytes] = {}

@classmethod
def tearDownClass(cls):
Expand Down
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def MakeXyzs(v):
class CoderRegistry(object):
"""A coder registry for typehint/coder associations."""
def __init__(self, fallback_coder=None):
self._coders = {} # type: Dict[Any, Type[coders.Coder]]
self.custom_types = [] # type: List[Any]
self._coders: Dict[Any, Type[coders.Coder]] = {}
self.custom_types: List[Any] = []
self.register_standard_coders(fallback_coder)

def register_standard_coders(self, fallback_coder):
Expand All @@ -104,12 +104,14 @@ def register_standard_coders(self, fallback_coder):
def register_fallback_coder(self, fallback_coder):
self._fallback_coder = FirstOf([fallback_coder, self._fallback_coder])

def _register_coder_internal(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
def _register_coder_internal(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
self._coders[typehint_type] = typehint_coder_class

def register_coder(self, typehint_type, typehint_coder_class):
# type: (Any, Type[coders.Coder]) -> None
def register_coder(
self, typehint_type: Any,
typehint_coder_class: Type[coders.Coder]) -> None:
if not isinstance(typehint_coder_class, type):
raise TypeError(
'Coder registration requires a coder class object. '
Expand All @@ -122,8 +124,7 @@ def register_coder(self, typehint_type, typehint_coder_class):
typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
self._register_coder_internal(typehint_type, typehint_coder_class)

def get_coder(self, typehint):
# type: (Any) -> coders.Coder
def get_coder(self, typehint: Any) -> coders.Coder:
if typehint and typehint.__module__ == '__main__':
# See https://github.com/apache/beam/issues/21541
# TODO(robertwb): Remove once all runners are portable.
Expand Down Expand Up @@ -187,8 +188,7 @@ class FirstOf(object):
"""For internal use only; no backwards-compatibility guarantees.
A class used to get the first matching coder from a list of coders."""
def __init__(self, coders):
# type: (Iterable[Type[coders.Coder]]) -> None
def __init__(self, coders: Iterable[Type[coders.Coder]]) -> None:
self._coders = coders

def from_type_hint(self, typehint, registry):
Expand Down
38 changes: 16 additions & 22 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import inspect
import warnings
import weakref
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Union

Expand All @@ -35,19 +35,13 @@
from apache_beam.dataframe.schemas import generate_proxy
from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from typing import Optional


# TODO: Or should this be called as_dataframe?
def to_dataframe(
pcoll, # type: pvalue.PCollection
proxy=None, # type: Optional[pd.core.generic.NDFrame]
label=None, # type: Optional[str]
):
# type: (...) -> frame_base.DeferredFrame

pcoll: pvalue.PCollection,
proxy: Optional[pd.core.generic.NDFrame] = None,
label: Optional[str] = None,
) -> frame_base.DeferredFrame:
"""Converts a PCollection to a deferred dataframe-like object, which can
manipulated with pandas methods like `filter` and `groupby`.
Expand Down Expand Up @@ -93,10 +87,10 @@ def to_dataframe(
# Note that the pipeline (indirectly) holds references to the transforms which
# keeps both the PCollections and expressions alive. This ensures the
# expression's ids are never accidentally re-used.
TO_PCOLLECTION_CACHE = weakref.WeakValueDictionary(
) # type: weakref.WeakValueDictionary[str, pvalue.PCollection]
UNBATCHED_CACHE = weakref.WeakValueDictionary(
) # type: weakref.WeakValueDictionary[str, pvalue.PCollection]
TO_PCOLLECTION_CACHE: 'weakref.WeakValueDictionary[str, pvalue.PCollection]' = (
weakref.WeakValueDictionary())
UNBATCHED_CACHE: 'weakref.WeakValueDictionary[str, pvalue.PCollection]' = (
weakref.WeakValueDictionary())


class RowsToDataFrameFn(beam.DoFn):
Expand Down Expand Up @@ -173,7 +167,7 @@ def infer_output_type(self, input_element_type):


def to_pcollection(
*dataframes, # type: Union[frame_base.DeferredFrame, pd.DataFrame, pd.Series]
*dataframes: Union[frame_base.DeferredFrame, pd.DataFrame, pd.Series],
label=None,
always_return_tuple=False,
yield_elements='schemas',
Expand Down Expand Up @@ -258,12 +252,12 @@ def extract_input(placeholder):
df for df in dataframes if df._expr._id not in TO_PCOLLECTION_CACHE
]
if len(new_dataframes):
new_results = {p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform({
ix: df._expr
for (ix, df) in enumerate(new_dataframes)
}) # type: Dict[Any, pvalue.PCollection]
new_results: Dict[Any, pvalue.PCollection] = {
p: extract_input(p)
for p in placeholders
} | label >> transforms._DataframeExpressionsTransform(
{ix: df._expr
for (ix, df) in enumerate(new_dataframes)})

TO_PCOLLECTION_CACHE.update(
{new_dataframes[ix]._expr._id: pc
Expand Down
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/dataframe/frame_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

class DeferredBase(object):

_pandas_type_map = {} # type: Dict[Union[type, None], type]
_pandas_type_map: Dict[Union[type, None], type] = {}

def __init__(self, expr):
self._expr = expr
Expand Down Expand Up @@ -197,8 +197,8 @@ def _proxy_method(
inplace=False,
base=None,
*,
requires_partition_by, # type: partitionings.Partitioning
preserves_partition_by, # type: partitionings.Partitioning
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):
if name is None:
name, func = name_and_func(func)
Expand Down Expand Up @@ -227,14 +227,14 @@ def _elementwise_function(


def _proxy_function(
func, # type: Union[Callable, str]
name=None, # type: Optional[str]
restrictions=None, # type: Optional[Dict[str, Union[Any, List[Any]]]]
inplace=False, # type: bool
base=None, # type: Optional[type]
func: Union[Callable, str],
name: Optional[str] = None,
restrictions: Optional[Dict[str, Union[Any, List[Any]]]] = None,
inplace: bool = False,
base: Optional[type] = None,
*,
requires_partition_by, # type: partitionings.Partitioning
preserves_partition_by, # type: partitionings.Partitioning
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):

if name is None:
Expand Down
9 changes: 3 additions & 6 deletions sdks/python/apache_beam/dataframe/partitionings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ class Partitioning(object):
def __repr__(self):
return self.__class__.__name__

def is_subpartitioning_of(self, other):
# type: (Partitioning) -> bool

def is_subpartitioning_of(self, other: 'Partitioning') -> bool:
"""Returns whether self is a sub-partition of other.
Specifically, returns whether something partitioned by self is necissarily
Expand All @@ -48,9 +46,8 @@ def __lt__(self, other):
def __le__(self, other):
return not self.is_subpartitioning_of(other)

def partition_fn(self, df, num_partitions):
# type: (Frame, int) -> Iterable[Tuple[Any, Frame]]

def partition_fn(self, df: Frame,
num_partitions: int) -> Iterable[Tuple[Any, Frame]]:
"""A callable that actually performs the partitioning of a Frame df.
This will be invoked via a FlatMap in conjunction with a GroupKey to
Expand Down
9 changes: 3 additions & 6 deletions sdks/python/apache_beam/dataframe/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ def expand(self, pcoll):
| beam.Map(converter.produce_batch))


def generate_proxy(element_type):
# type: (type) -> pd.DataFrame

def generate_proxy(element_type: type) -> pd.DataFrame:
"""Generate a proxy pandas object for the given PCollection element_type.
Currently only supports generating a DataFrame proxy from a schema-aware
Expand All @@ -106,9 +104,8 @@ def generate_proxy(element_type):
return proxy


def element_type_from_dataframe(proxy, include_indexes=False):
# type: (pd.DataFrame, bool) -> type

def element_type_from_dataframe(
proxy: pd.DataFrame, include_indexes: bool = False) -> type:
"""Generate an element_type for an element-wise PCollection from a proxy
pandas object. Currently only supports converting the element_type for
a schema-aware PCollection to a proxy DataFrame.
Expand Down
Loading

0 comments on commit 3696140

Please sign in to comment.