Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize type hints #31755

Merged
merged 29 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
43b237e
Modernize python type hints for apache_beam
robertwb Jul 2, 2024
cd495e9
Modernize python type hints for apache_beam/coders
robertwb Jul 2, 2024
6143cd0
Modernize python type hints for apache_beam/dataframe
robertwb Jul 2, 2024
d75916b
Modernize python type hints for apache_beam/examples/cookbook
robertwb Jul 2, 2024
c842252
Modernize python type hints for apache_beam/internal
robertwb Jul 2, 2024
33bde4d
Modernize python type hints for apache_beam/internal/metrics
robertwb Jul 2, 2024
d73982a
Modernize python type hints for apache_beam/io
robertwb Jul 2, 2024
8f6f24d
Modernize python type hints for apache_beam/io/azure
robertwb Jul 2, 2024
b8029e9
Modernize python type hints for apache_beam/io/flink
robertwb Jul 2, 2024
f49a29a
Modernize python type hints for apache_beam/io/gcp
robertwb Jul 2, 2024
5d5a09b
Modernize python type hints for apache_beam/metrics
robertwb Jul 2, 2024
fbafe8d
Modernize python type hints for apache_beam/ml/gcp
robertwb Jul 2, 2024
0eab298
Modernize python type hints for apache_beam/options
robertwb Jul 2, 2024
842b8ec
Modernize python type hints for apache_beam/runners
robertwb Jul 2, 2024
0763d7e
Modernize python type hints for apache_beam/runners/direct
robertwb Jul 2, 2024
3cf0c55
Modernize python type hints for apache_beam/runners/interactive
robertwb Jul 2, 2024
b416982
Modernize python type hints for apache_beam/runners/job
robertwb Jul 2, 2024
8b540eb
Modernize python type hints for apache_beam/runners/portability
robertwb Jul 2, 2024
8fdbe88
Modernize python type hints for apache_beam/runners/worker
robertwb Jul 2, 2024
77d8189
Modernize python type hints for apache_beam/testing/benchmarks
robertwb Jul 2, 2024
acfd72c
Modernize python type hints for apache_beam/testing/load_tests
robertwb Jul 2, 2024
4402f2d
Modernize python type hints for apache_beam/transforms
robertwb Jul 2, 2024
79d4ffd
Modernize python type hints for apache_beam/typehints
robertwb Jul 2, 2024
f2ffa5e
Modernize python type hints for apache_beam/utils
robertwb Jul 2, 2024
abdb1b7
Fix circular references, mypy complaints.
robertwb Jul 2, 2024
d4de077
Fix bad type declarations.
robertwb Jul 3, 2024
14c52d6
Fix bad typing in PubSub tests.
robertwb Jul 3, 2024
64e6194
Preserve existing linter comments.
robertwb Jul 3, 2024
a0ba8de
isort
robertwb Jul 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def from_type_hint(cls, type_hint, registry):
return cls(schema)

@staticmethod
def from_payload(payload: bytes) -> RowCoder:
def from_payload(payload: bytes) -> 'RowCoder':
return RowCoder(proto_utils.parse_Bytes(payload, schema_pb2.Schema))

def __reduce__(self):
Expand Down
6 changes: 1 addition & 5 deletions sdks/python/apache_beam/dataframe/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import inspect
import warnings
import weakref
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Tuple
from typing import Union

Expand All @@ -35,10 +35,6 @@
from apache_beam.dataframe.schemas import generate_proxy
from apache_beam.typehints.pandas_type_compatibility import dtype_to_fieldtype

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from typing import Optional


# TODO: Or should this be called as_dataframe?
def to_dataframe(
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/dataframe/partitionings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Partitioning(object):
def __repr__(self):
return self.__class__.__name__

def is_subpartitioning_of(self, other: Partitioning) -> bool:
def is_subpartitioning_of(self, other: 'Partitioning') -> bool:
"""Returns whether self is a sub-partition of other.

Specifically, returns whether something partitioned by self is necissarily
Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/internal/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ def __init__(self, bucket_type):
def reset(self):
self.data = HistogramAggregator(self._bucket_type).identity_element()

def combine(self, other: HistogramCell) -> HistogramCell:
def combine(self, other: 'HistogramCell') -> 'HistogramCell':
result = HistogramCell(self._bucket_type)
result.data = self.data.combine(other.data)
return result

def update(self, value):
self.data.histogram.record(value)

def get_cumulative(self) -> HistogramData:
def get_cumulative(self) -> 'HistogramData':
return self.data.get_cumulative()

def to_runner_api_monitoring_info(self, name, transform_id):
Expand All @@ -90,7 +90,7 @@ def __hash__(self):


class HistogramResult(object):
def __init__(self, data: HistogramData) -> None:
def __init__(self, data: 'HistogramData') -> None:
self.data = data

def __eq__(self, other):
Expand Down Expand Up @@ -139,10 +139,10 @@ def __hash__(self):
def __repr__(self):
return 'HistogramData({})'.format(self.histogram.get_percentile_info())

def get_cumulative(self) -> HistogramData:
def get_cumulative(self) -> 'HistogramData':
return HistogramData(self.histogram)

def combine(self, other: Optional[HistogramData]) -> HistogramData:
def combine(self, other: Optional['HistogramData']) -> 'HistogramData':
if other is None:
return self

Expand All @@ -156,7 +156,7 @@ class HistogramAggregator(MetricAggregator):

Values aggregated should be ``HistogramData`` objects.
"""
def __init__(self, bucket_type: BucketType) -> None:
def __init__(self, bucket_type: 'BucketType') -> None:
self._bucket_type = bucket_type

def identity_element(self) -> HistogramData:
Expand Down
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/internal/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def counter(
def histogram(
namespace: Union[Type, str],
name: str,
bucket_type: BucketType,
logger: Optional[MetricLogger] = None) -> Metrics.DelegatingHistogram:
bucket_type: 'BucketType',
logger: Optional['MetricLogger'] = None) -> 'Metrics.DelegatingHistogram':
"""Obtains or creates a Histogram metric.

Args:
Expand All @@ -109,8 +109,8 @@ class DelegatingHistogram(Histogram):
def __init__(
self,
metric_name: MetricName,
bucket_type: BucketType,
logger: Optional[MetricLogger]) -> None:
bucket_type: 'BucketType',
logger: Optional['MetricLogger']) -> None:
super().__init__(metric_name)
self.metric_name = metric_name
self.cell_type = HistogramCellFactory(bucket_type)
Expand All @@ -126,23 +126,23 @@ def update(self, value: object) -> None:
class MetricLogger(object):
"""Simple object to locally aggregate and log metrics."""
def __init__(self) -> None:
self._metric: Dict[MetricName, MetricCell] = {}
self._metric: Dict[MetricName, 'MetricCell'] = {}
self._lock = threading.Lock()
self._last_logging_millis = int(time.time() * 1000)
self.minimum_logging_frequency_msec = 180000

def update(
self,
cell_type: Union[Type[MetricCell], MetricCellFactory],
cell_type: Union[Type['MetricCell'], 'MetricCellFactory'],
metric_name: MetricName,
value: object) -> None:
cell = self._get_metric_cell(cell_type, metric_name)
cell.update(value)

def _get_metric_cell(
self,
cell_type: Union[Type[MetricCell], MetricCellFactory],
metric_name: MetricName) -> MetricCell:
cell_type: Union[Type['MetricCell'], 'MetricCellFactory'],
metric_name: MetricName) -> 'MetricCell':
with self._lock:
if metric_name not in self._metric:
self._metric[metric_name] = cell_type()
Expand Down Expand Up @@ -187,7 +187,7 @@ def __init__(
self.base_labels = base_labels if base_labels else {}
self.request_count_urn = request_count_urn

def call(self, status: Union[int, str, HttpError]) -> None:
def call(self, status: Union[int, str, 'HttpError']) -> None:
"""Record the status of the call into appropriate metrics."""
canonical_status = self.convert_to_canonical_status_string(status)
additional_labels = {monitoring_infos.STATUS_LABEL: canonical_status}
Expand All @@ -200,7 +200,7 @@ def call(self, status: Union[int, str, HttpError]) -> None:
request_counter.inc()

def convert_to_canonical_status_string(
self, status: Union[int, str, HttpError]) -> str:
self, status: Union[int, str, 'HttpError']) -> str:
"""Converts a status to a canonical GCP status cdoe string."""
http_status_code = None
if isinstance(status, int):
Expand Down
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/internal/module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import re
import sys
from typing import Type


class TopClass(object):
Expand Down Expand Up @@ -64,7 +63,7 @@ def get(self):
class RecursiveClass(object):
"""A class that contains a reference to itself."""

SELF_TYPE: Type[RecursiveClass] = None
SELF_TYPE = None

def __init__(self, datum):
self.datum = 'RecursiveClass:%s' % datum
Expand Down
8 changes: 2 additions & 6 deletions sdks/python/apache_beam/io/azure/blobstoragefilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ def create(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
# noqa: F821

compression_type=CompressionTypes.AUTO):
"""Returns a write channel for the given file path.

Args:
Expand All @@ -168,9 +166,7 @@ def open(
self,
path,
mime_type='application/octet-stream',
compression_type=CompressionTypes.AUTO) -> BinaryIO:
# noqa: F821

compression_type=CompressionTypes.AUTO):
"""Returns a read channel for the given file path.

Args:
Expand Down
25 changes: 14 additions & 11 deletions sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES.
"""

from typing import Any
from typing import Dict

# BigQuery types as listed in
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
# with aliases (RECORD, BOOLEAN, FLOAT, INTEGER) as defined in
Expand Down Expand Up @@ -63,20 +66,20 @@


def get_record_schema_from_dict_table_schema(
schema_name: Text,
table_schema: Dict[Text, Any],
namespace: Text = "apache_beam.io.gcp.bigquery") -> Dict[Text, Any]:
schema_name: str,
table_schema: Dict[str, Any],
namespace: str = "apache_beam.io.gcp.bigquery") -> Dict[str, Any]:
# noqa: F821

"""Convert a table schema into an Avro schema.

Args:
schema_name (Text): The name of the record.
table_schema (Dict[Text, Any]): A BigQuery table schema in dict form.
namespace (Text): The namespace of the Avro schema.
schema_name (str): The name of the record.
table_schema (Dict[str, Any]): A BigQuery table schema in dict form.
namespace (str): The namespace of the Avro schema.

Returns:
Dict[Text, Any]: The schema as an Avro RecordSchema.
Dict[str, Any]: The schema as an Avro RecordSchema.
"""
avro_fields = [
table_field_to_avro_field(field, ".".join((namespace, schema_name)))
Expand All @@ -92,17 +95,17 @@ def get_record_schema_from_dict_table_schema(
}


def table_field_to_avro_field(table_field: Dict[Text, Any],
namespace: str) -> Dict[Text, Any]:
def table_field_to_avro_field(table_field: Dict[str, Any],
namespace: str) -> Dict[str, Any]:
# noqa: F821

"""Convert a BigQuery field to an avro field.

Args:
table_field (Dict[Text, Any]): A BigQuery field in dict form.
table_field (Dict[str, Any]): A BigQuery field in dict form.

Returns:
Dict[Text, Any]: An equivalent Avro field in dict form.
Dict[str, Any]: An equivalent Avro field in dict form.
"""
assert "type" in table_field, \
"Unable to get type for table field {}".format(table_field)
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_schema_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@


def generate_user_type_from_bq_schema(
the_table_schema, selected_fields: bigquery.TableSchema = None) -> type:
the_table_schema, selected_fields: 'bigquery.TableSchema' = None) -> type:
"""Convert a schema of type TableSchema into a pcollection element.
Args:
the_table_schema: A BQ schema of type TableSchema
Expand Down
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/io/gcp/datastore/v1new/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from typing import Iterable
from typing import List
from typing import Optional
from typing import Text
from typing import Union

from google.cloud.datastore import entity
Expand Down Expand Up @@ -155,10 +154,10 @@ def __repr__(self):
class Key(object):
def __init__(
self,
path_elements: List[Union[Text, int]],
parent: Optional[Key] = None,
project: Optional[Text] = None,
namespace: Optional[Text] = None):
path_elements: List[Union[str, int]],
parent: Optional['Key'] = None,
project: Optional[str] = None,
namespace: Optional[str] = None):
"""
Represents a Datastore key.

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def __repr__(self):
return 'PubsubMessage(%s, %s)' % (self.data, self.attributes)

@staticmethod
def _from_proto_str(proto_msg: bytes) -> PubsubMessage:
def _from_proto_str(proto_msg: bytes) -> 'PubsubMessage':
"""Construct from serialized form of ``PubsubMessage``.

Args:
Expand Down Expand Up @@ -183,7 +183,7 @@ def _to_proto_str(self, for_publish=False):
return serialized

@staticmethod
def _from_message(msg: Any) -> PubsubMessage:
def _from_message(msg: Any) -> 'PubsubMessage':
"""Construct from ``google.cloud.pubsub_v1.subscriber.message.Message``.

https://googleapis.github.io/google-cloud-python/latest/pubsub/subscriber/api/message.html
Expand Down
6 changes: 3 additions & 3 deletions sdks/python/apache_beam/io/iobase.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def get_range_tracker(
self,
start_position: Optional[Any],
stop_position: Optional[Any],
) -> RangeTracker:
) -> 'RangeTracker':
"""Returns a RangeTracker for a given position range.

Framework may invoke ``read()`` method with the RangeTracker object returned
Expand Down Expand Up @@ -1281,7 +1281,7 @@ def current_restriction(self):
"""
raise NotImplementedError

def current_progress(self) -> RestrictionProgress:
def current_progress(self) -> 'RestrictionProgress':
"""Returns a RestrictionProgress object representing the current progress.

This API is recommended to be implemented. The runner can do a better job
Expand Down Expand Up @@ -1471,7 +1471,7 @@ def fraction_remaining(self) -> float:
else:
return float(self._remaining) / self.total_work

def with_completed(self, completed: int) -> RestrictionProgress:
def with_completed(self, completed: int) -> 'RestrictionProgress':
return RestrictionProgress(
fraction=self._fraction, remaining=self._remaining, completed=completed)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/restriction_trackers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
yield OffsetRange(current_split_start, current_split_stop)
current_split_start = current_split_stop

def split_at(self, split_pos) -> Tuple[OffsetRange, OffsetRange]:
def split_at(self, split_pos) -> Tuple['OffsetRange', 'OffsetRange']:
return OffsetRange(self.start, split_pos), OffsetRange(split_pos, self.stop)

def new_tracker(self):
Expand Down
Loading
Loading