Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

numpy.int64 type is not serialized correctly in Python 3.11 and Python 3.12 #33137

Merged
merged 6 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,9 @@ def encode_special_deterministic(self, value, stream):
stream.write_byte(NESTED_STATE_TYPE)
self.encode_type(type(value), stream)
state_value = value.__getstate__()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that just checking that this value is not None is insufficient, as objects with custom pickling protocols can still use this method and the root object now defines a default for anything with dict or slots.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is insufficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if value is not None and state_value is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we checking that value is not None here? I don't think we want to use this protocol for None.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just indicates __getstate__ might not be implemented. And we know this is true for this if checks.

# https://github.com/apache/beam/issues/33020
raise TypeError(self._deterministic_encoding_error_msg(value))
try:
self.encode_to_stream(state_value, stream, True)
except Exception as e:
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/coders/coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import proto
import pytest

import apache_beam as beam
from apache_beam import typehints
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coders
from apache_beam.coders.avro_record import AvroRecord
from apache_beam.coders.typecoders import registry as coders_registry
from apache_beam.testing.test_pipeline import TestPipeline


class PickleCoderTest(unittest.TestCase):
Expand Down Expand Up @@ -242,6 +244,20 @@ def test_to_type_hint(self):
assert coder.to_type_hint() is bytes


class NumpyIntAsKeyTest(unittest.TestCase):
def test_numpy_int(self):
# this type is not supported as the key
import numpy as np

with self.assertRaises(TypeError):
with TestPipeline() as p:
indata = p | "Create" >> beam.Create([(a, int(a))
for a in np.arange(3)])

# Apply CombinePerkey to sum values for each key.
_ = indata | "CombinePerKey" >> beam.CombinePerKey(sum)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
Loading