Skip to content

Commit

Permalink
Revert three commits related to supporting custom coder in reshuffle
Browse files Browse the repository at this point in the history
- Fix custom coder not being used in Reshuffle (global window) (#33339)
- Fix custom coders not being used in Reshuffle (non global window) #33363
- Add missing to_type_hint to WindowedValueCoder #33403
  • Loading branch information
shunping committed Dec 18, 2024
1 parent e68a79c commit 4cbf257
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 119 deletions.
11 changes: 0 additions & 11 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -1438,17 +1438,6 @@ def __hash__(self):
return hash(
(self.wrapped_value_coder, self.timestamp_coder, self.window_coder))

@classmethod
def from_type_hint(cls, typehint, registry):
# type: (Any, CoderRegistry) -> WindowedValueCoder
# Ideally this'd take two parameters so that one could hint at
# the window type as well instead of falling back to the
# pickle coders.
return cls(registry.get_coder(typehint.inner_type))

def to_type_hint(self):
return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()]


Coder.register_structured_urn(
common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder)
Expand Down
6 changes: 0 additions & 6 deletions sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,6 @@ def test_numpy_int(self):
_ = indata | "CombinePerKey" >> beam.CombinePerKey(sum)


class WindowedValueCoderTest(unittest.TestCase):
def test_to_type_hint(self):
coder = coders.WindowedValueCoder(coders.VarIntCoder())
self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int]) # type: ignore[misc]


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
2 changes: 0 additions & 2 deletions sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ def register_standard_coders(self, fallback_coder):
self._register_coder_internal(str, coders.StrUtf8Coder)
self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
self._register_coder_internal(typehints.DictConstraint, coders.MapCoder)
self._register_coder_internal(
typehints.WindowedTypeConstraint, coders.WindowedValueCoder)
# Default fallback coders applied in that order until the first matching
# coder found.
default_fallback_coders = [
Expand Down
13 changes: 2 additions & 11 deletions sdks/python/apache_beam/transforms/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from typing import Callable
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import TypeVar
from typing import Union
Expand Down Expand Up @@ -74,13 +73,11 @@
from apache_beam.transforms.window import TimestampedValue
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.decorators import get_signature
from apache_beam.typehints.native_type_compatibility import TypedWindowedValue
from apache_beam.typehints.sharded_key_type import ShardedKeyType
from apache_beam.utils import shared
from apache_beam.utils import windowed_value
from apache_beam.utils.annotations import deprecated
from apache_beam.utils.sharded_key import ShardedKey
from apache_beam.utils.timestamp import Timestamp

if TYPE_CHECKING:
from apache_beam.runners.pipeline_context import PipelineContext
Expand Down Expand Up @@ -956,10 +953,6 @@ def restore_timestamps(element):
window.GlobalWindows.windowed_value((key, value), timestamp)
for (value, timestamp) in values
]

ungrouped = pcoll | Map(reify_timestamps).with_input_types(
Tuple[K, V]).with_output_types(
Tuple[K, Tuple[V, Optional[Timestamp]]])
else:

# typing: All conditional function variants must have identical signatures
Expand All @@ -973,8 +966,7 @@ def restore_timestamps(element):
key, windowed_values = element
return [wv.with_value((key, wv.value)) for wv in windowed_values]

ungrouped = pcoll | Map(reify_timestamps).with_input_types(
Tuple[K, V]).with_output_types(Tuple[K, TypedWindowedValue[V]])
ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any)

# TODO(https://github.com/apache/beam/issues/19785) Using global window as
# one of the standard window. This is to mitigate the Dataflow Java Runner
Expand Down Expand Up @@ -1026,8 +1018,7 @@ def expand(self, pcoll):
pcoll | 'AddRandomKeys' >>
Map(lambda t: (random.randrange(0, self.num_buckets), t)
).with_input_types(T).with_output_types(Tuple[int, T])
| ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(
Tuple[int, T])
| ReshufflePerKey()
| 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types(
Tuple[int, T]).with_output_types(T))

Expand Down
54 changes: 0 additions & 54 deletions sdks/python/apache_beam/transforms/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,60 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam):
equal_to(expected_data),
label="formatted_after_reshuffle")

global _Unpicklable
global _UnpicklableCoder

class _Unpicklable(object):
def __init__(self, value):
self.value = value

def __getstate__(self):
raise NotImplementedError()

def __setstate__(self, state):
raise NotImplementedError()

class _UnpicklableCoder(beam.coders.Coder):
def encode(self, value):
return str(value.value).encode()

def decode(self, encoded):
return _Unpicklable(int(encoded.decode()))

def to_type_hint(self):
return _Unpicklable

def is_deterministic(self):
return True

def test_reshuffle_unpicklable_in_global_window(self):
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)

with TestPipeline() as pipeline:
data = [_Unpicklable(i) for i in range(5)]
expected_data = [0, 10, 20, 30, 40]
result = (
pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
| beam.Reshuffle()
| beam.Map(lambda u: u.value * 10))
assert_that(result, equal_to(expected_data))

def test_reshuffle_unpicklable_in_non_global_window(self):
beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder)

with TestPipeline() as pipeline:
data = [_Unpicklable(i) for i in range(5)]
expected_data = [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]
result = (
pipeline
| beam.Create(data)
| beam.WindowInto(window.SlidingWindows(size=3, period=1))
| beam.Reshuffle()
| beam.Map(lambda u: u.value * 10))
assert_that(result, equal_to(expected_data))


class WithKeysTest(unittest.TestCase):
def setUp(self):
Expand Down
26 changes: 0 additions & 26 deletions sdks/python/apache_beam/typehints/native_type_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@
import sys
import types
import typing
from typing import Generic
from typing import TypeVar

from apache_beam.typehints import typehints

T = TypeVar('T')

_LOGGER = logging.getLogger(__name__)

# Describes an entry in the type map in convert_to_beam_type.
Expand Down Expand Up @@ -220,18 +216,6 @@ def convert_collections_to_typing(typ):
return typ


# During type inference of WindowedValue, we need to pass in the inner value
# type. This cannot be achieved immediately with WindowedValue class because it
# is not parameterized. Changing it to a generic class (e.g. WindowedValue[T])
# could work in theory. However, the class is cythonized and it seems that
# cython does not handle generic classes well.
# The workaround here is to create a separate class solely for the type
# inference purpose. This class should never be used for creating instances.
class TypedWindowedValue(Generic[T]):
def __init__(self, *args, **kwargs):
raise NotImplementedError("This class is solely for type inference")


def convert_to_beam_type(typ):
"""Convert a given typing type to a Beam type.
Expand Down Expand Up @@ -283,12 +267,6 @@ def convert_to_beam_type(typ):
# TODO(https://github.com/apache/beam/issues/20076): Currently unhandled.
_LOGGER.info('Converting NewType type hint to Any: "%s"', typ)
return typehints.Any
elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \
getattr(typ, "__name__", typ.__origin__.__name__) == 'TypedWindowedValue':
# Need to pass through WindowedValue class so that it can be converted
# to the correct type constraint in Beam
# This is needed to fix https://github.com/apache/beam/issues/33356
pass
elif (typ_module != 'typing') and (typ_module != 'collections.abc'):
# Only translate types from the typing and collections.abc modules.
return typ
Expand Down Expand Up @@ -346,10 +324,6 @@ def convert_to_beam_type(typ):
match=_match_is_exactly_collection,
arity=1,
beam_type=typehints.Collection),
_TypeMapEntry(
match=_match_issubclass(TypedWindowedValue),
arity=1,
beam_type=typehints.WindowedValue),
]

# Find the first matching entry.
Expand Down
9 changes: 0 additions & 9 deletions sdks/python/apache_beam/typehints/typehints.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,15 +1213,6 @@ def type_check(self, instance):
repr(self.inner_type),
instance.value.__class__.__name__))

def bind_type_variables(self, bindings):
bound_inner_type = bind_type_variables(self.inner_type, bindings)
if bound_inner_type == self.inner_type:
return self
return WindowedValue[bound_inner_type]

def __repr__(self):
return 'WindowedValue[%s]' % repr(self.inner_type)


class GeneratorHint(IteratorHint):
"""A Generator type hint.
Expand Down

0 comments on commit 4cbf257

Please sign in to comment.