Skip to content

Commit

Permalink
Bugfix #340: remove mmap buffer, require msgpack==0.5.0 (we need stri…
Browse files Browse the repository at this point in the history
…ct_types), consolidate msgpack usage into _msgpack_compact, add metadata size warning at 8MB (max is 16MB)
  • Loading branch information
jamesmunro committed Mar 12, 2024
1 parent 0122681 commit 597a28f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 135 deletions.
49 changes: 44 additions & 5 deletions python/arcticdb/_msgpack_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,54 @@
This module implements a backwards compatible version of msgpack functions.
"""

import msgpack
from arcticdb.log import version as log
from arcticdb.exceptions import ArcticNativeException

ExtType = msgpack.ExtType


def _check_valid_msgpack():
pack_module = msgpack.packb.__module__
packer_module = msgpack.Packer.__module__
# Check that msgpack hasn't been monkey patched by another package
# We only support the official cmsgpack and fallback modules
if (pack_module in ("msgpack", "msgpack.fallback")) and (
packer_module in ("msgpack._packer", "msgpack.fallback", "msgpack._cmsgpack")
):
return
log.info("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))
raise ArcticNativeException("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))


def packb(obj, **kwargs):
_check_valid_msgpack()
# use_bin_type supported from msgpack==0.4.0 but became true later
return msgpack.packb(obj, use_bin_type=True, strict_types=True, **kwargs)

def unpackb(*args, **kwargs):
packb.__doc__ = msgpack.packb.__doc__
packb.__name__ = msgpack.packb.__name__


def padded_packb(obj, **kwargs):
"""msgpack.packb with some defaults across msgpack versions and padded to 8 bytes
returns: (packed bytes, nbytes of unpadded content)"""
_check_valid_msgpack()
# use_bin_type is supported from msgpack==0.4.0 but became true later
# don't reset the buffer so we can append padding bytes
packer = msgpack.Packer(autoreset=False, use_bin_type=True, strict_types=True, **kwargs)
packer.pack(obj)
nbytes = packer.getbuffer().nbytes
pad = -nbytes % 8 # next multiple of 8 bytes
[packer.pack(None) for _ in range(pad)] # None is packed as single byte b`\xc0`
assert packer.getbuffer().nbytes % 8 == 0
return packer.bytes(), nbytes


def unpackb(packed, **kwargs):
if msgpack.version >= (0, 6, 0):
kwargs.setdefault("strict_map_key", False)
return msgpack.unpackb(*args, **kwargs)

return msgpack.unpackb(packed, **kwargs)

unpackb.__doc__ = msgpack.unpackb.__doc__
unpackb.__name__ = msgpack.unpackb.__name__
unpackb.__name__ = msgpack.unpackb.__name__
158 changes: 37 additions & 121 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from pandas.api.types import is_integer_dtype
from arcticc.pb2.descriptors_pb2 import UserDefinedMetadata, NormalizationMetadata, MsgPackSerialization
from arcticc.pb2.storage_pb2 import VersionStoreConfig
from mmap import mmap
from collections import Counter
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException
from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types
Expand All @@ -32,8 +31,8 @@
from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Index, RangeIndex
from typing import NamedTuple, List, Union, Mapping, Any, TypeVar, Tuple

from arcticdb import _msgpack_compat
from arcticdb.log import version
from arcticdb._msgpack_compat import packb, padded_packb, unpackb, ExtType
from arcticdb.log import version as log
from arcticdb.version_store._common import _column_name_to_strings, TimeFrame

PICKLE_PROTOCOL = 4
Expand All @@ -54,11 +53,6 @@ def check_is_utc_if_newer_pandas(*args, **kwargs):
return False # the UTC specific issue is not present in old Pandas so no need to go down special case


log = version

from msgpack import packb, unpackb, pack, ExtType


IS_WINDOWS = sys.platform == "win32"


Expand Down Expand Up @@ -277,7 +271,7 @@ def _to_tz_timestamp(dt):
microsecond=dt.microsecond,
).value
tz = dt.tzinfo.zone if dt.tzinfo is not None else None
return ts, tz
return [ts, tz]


def _from_tz_timestamp(ts, tz):
Expand Down Expand Up @@ -912,75 +906,22 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col
)


def check_valid_msgpack(pack_module, packer_module):
if (pack_module in ("msgpack", "msgpack.fallback")) and (
packer_module in ("msgpack._packer", "msgpack.fallback", "msgpack._cmsgpack")
):
return
log.info("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))
raise ArcticNativeException("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))


def _pack(*args, **kwargs):
from msgpack import Packer

check_valid_msgpack(pack.__module__, Packer.__module__)

return pack(*args, **kwargs)


def _packb(*args, **kwargs):
from msgpack import Packer

check_valid_msgpack(packb.__module__, Packer.__module__)

return packb(*args, **kwargs)


class MsgPackNormalizer(Normalizer):
"""
Fall back plan for the time being to store arbitrary data
"""

MSG_PACK_MAX_SIZE = (1 << 32) + 1024
MMAP_DEFAULT_SIZE = (
MSG_PACK_MAX_SIZE # Allow up to 4 gib pickles in msgpack by default, most of these compress fairly well.
)
# msgpack checks whether the size of pickled data within 1 << 32 - 1 byte only
# Extra memory is needed in mmap for msgpack's overhead

def __init__(self, cfg=None):
self._size = MsgPackNormalizer.MMAP_DEFAULT_SIZE if cfg is None else cfg.max_blob_size
self.MSG_PACK_MAX_SIZE = self._size # Override with the max_pickle size if set in config.
self.strict_mode = cfg.strict_mode if cfg is not None else False

def normalize(self, obj, **kwargs):
buffer = mmap(-1, self._size)
try:
return self._pack_with_buffer(obj, buffer)
except:
buffer.close()
raise

def _pack_with_buffer(self, obj, buffer: mmap):
try:
self._msgpack_pack(obj, buffer)
except ValueError as e:
if str(e) == "data out of range":
raise ArcticDbNotYetImplemented("Fallback normalized msgpack size cannot exceed {}B".format(self._size))
else:
raise
packed, nbytes = self._msgpack_padded_packb(obj)

norm_meta = NormalizationMetadata()
norm_meta.msg_pack_frame.version = 1

d, r = divmod(buffer.tell(), 8) # pack 8 by 8
size = d + int(r != 0)

norm_meta.msg_pack_frame.size_bytes = buffer.tell()
norm_meta.msg_pack_frame.size_bytes = nbytes

# FUTURE(#263): do we need to care about byte ordering?
column_val = np.array(memoryview(buffer[: size * 8]), np.uint8).view(np.uint64)
column_val = np.array(memoryview(packed), np.uint8).view(np.uint64)

return NormalizedInput(
item=NPDDataFrame(
Expand All @@ -999,84 +940,57 @@ def denormalize(self, obj, meta):
raise ArcticNativeException("Expected msg_pack_frame input, actual {}".format(meta))
sb = meta.msg_pack_frame.size_bytes
col_data = obj.data[0].view(np.uint8)[:sb]
return self._msgpack_unpack(memoryview(col_data))

@staticmethod
def _nested_msgpack_packb(obj):
return _packb(obj, use_bin_type=True)

@staticmethod
def _nested_msgpack_unpackb(buff, raw=False):
return _msgpack_compat.unpackb(
buff,
raw=raw,
max_ext_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
max_bin_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
)
return self._msgpack_unpackb(memoryview(col_data))

def _custom_pack(self, obj):
if isinstance(obj, pd.Timestamp):
tz = obj.tz.zone if obj.tz is not None else None
return ExtType(MsgPackSerialization.PD_TIMESTAMP, MsgPackNormalizer._nested_msgpack_packb((obj.value, tz)))
return ExtType(MsgPackSerialization.PD_TIMESTAMP, packb([obj.value, tz]))

if isinstance(obj, datetime.datetime):
return ExtType(
MsgPackSerialization.PY_DATETIME, MsgPackNormalizer._nested_msgpack_packb(_to_tz_timestamp(obj))
)
return ExtType(MsgPackSerialization.PY_DATETIME, packb(_to_tz_timestamp(obj)))

if isinstance(obj, datetime.timedelta):
return ExtType(
MsgPackSerialization.PY_TIMEDELTA, MsgPackNormalizer._nested_msgpack_packb(pd.Timedelta(obj).value)
)
return ExtType(MsgPackSerialization.PY_TIMEDELTA, packb(pd.Timedelta(obj).value))

if self.strict_mode:
raise TypeError("Normalisation is running in strict mode, writing pickled data is disabled.")
else:
return ExtType(
MsgPackSerialization.PY_PICKLE_3,
MsgPackNormalizer._nested_msgpack_packb(Pickler.write(obj)),
)
return ExtType(MsgPackSerialization.PY_PICKLE_3, packb(Pickler.write(obj)))

def _ext_hook(self, code, data):
if code == MsgPackSerialization.PD_TIMESTAMP:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data)
return pd.Timestamp(data[0], tz=data[1]) if data[1] is not None else pd.Timestamp(data[0])

if code == MsgPackSerialization.PY_DATETIME:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data)
return _from_tz_timestamp(data[0], data[1])

if code == MsgPackSerialization.PY_TIMEDELTA:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data)
return pd.Timedelta(data).to_pytimedelta()

if code == MsgPackSerialization.PY_PICKLE_2:
# If stored in Python2 we want to use raw while unpacking.
# https://github.com/msgpack/msgpack-python/blob/master/msgpack/_unpacker.pyx#L230
data = MsgPackNormalizer._nested_msgpack_unpackb(data, raw=True)
data = unpackb(data, raw=True)
return Pickler.read(data, pickled_in_python2=True)

if code == MsgPackSerialization.PY_PICKLE_3:
data = MsgPackNormalizer._nested_msgpack_unpackb(data, raw=False)
data = unpackb(data, raw=False)
return Pickler.read(data, pickled_in_python2=False)

return ExtType(code, data)

def _msgpack_pack(self, obj, buff):
try:
_pack(obj, buff, use_bin_type=True, default=self._custom_pack, strict_types=True)
except TypeError:
# Some ancient versions of msgpack don't support strict_types, fallback to the pack without that arg.
_pack(obj, buff, use_bin_type=True, default=self._custom_pack)

def _msgpack_unpack(self, buff, raw=False):
return _msgpack_compat.unpackb(
buff,
raw=raw,
ext_hook=self._ext_hook,
max_ext_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
max_bin_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
)
def _msgpack_packb(self, obj):
return packb(obj, default=self._custom_pack)

def _msgpack_padded_packb(self, obj):
return padded_packb(obj, default=self._custom_pack)

def _msgpack_unpackb(self, buff, raw=False):
return unpackb(buff, raw=raw, ext_hook=self._ext_hook)


class Pickler(object):
Expand Down Expand Up @@ -1284,7 +1198,8 @@ def denormalize(self, item, norm_meta):
normalize = _NORMALIZER.normalize
denormalize = _NORMALIZER.denormalize

_MAX_USER_DEFINED_META = 16 << 20
_MAX_USER_DEFINED_META = 16 << 20 # 16MB
_WARN_USER_DEFINED_META = 8 << 20 # 8MB


def _init_msgpack_metadata():
Expand All @@ -1296,34 +1211,35 @@ def _init_msgpack_metadata():
_msgpack_metadata = _init_msgpack_metadata()


# TODO: Fix the calls to msgpack functions to make the code nicer
def normalize_metadata(d):
# type: (Mapping[string, Any])->NormalizationMetadata.UserDefinedMetadata
m = mmap(-1, _MAX_USER_DEFINED_META)
# this trick is used to prevent arbitrary large object serialization for now
# pretty defensive, but some ppl do stuff numpy arrays of 10^6 elements in there...
# it is going to slow down the whole indexing read side

# Prevent arbitrary large object serialization
# as it will slow down the indexing read side
# which is not a good idea.
# A subsequent improvement could remove that limitation
# using an extra indirection and point to the blob key
# However, this is also a probable sign of poor data modelling
# and understanding the need should be a priority before
# removing this protection.
try:
_msgpack_metadata._msgpack_pack(d, m)
except ValueError:
packed = _msgpack_metadata._msgpack_packb(d)
size = len(packed)
if size > _MAX_USER_DEFINED_META:
raise ArcticDbNotYetImplemented("User defined metadata cannot exceed {}B".format(_MAX_USER_DEFINED_META))
if size > _WARN_USER_DEFINED_META:
log.warn('User defined metadata is above warning size ({0}B), metadata cannot exceed {1}B. Current size: {2}B.'\
.format(_WARN_USER_DEFINED_META, _MAX_USER_DEFINED_META, size))

udm = UserDefinedMetadata()
udm.inline_payload = memoryview(m[: m.tell()]).tobytes()
udm.inline_payload = packed
return udm


def denormalize_user_metadata(udm, ext_obj=None):
# type: (NormalizationMetadata.UserDefinedMetadata, Optional[buffer])->Mapping[string,Any]
storage_type = udm.WhichOneof("storage_type")
if storage_type == "inline_payload":
return _msgpack_metadata._msgpack_unpack(udm.inline_payload)
return _msgpack_metadata._msgpack_unpackb(udm.inline_payload)
elif storage_type is None:
return None
else:
Expand Down
13 changes: 5 additions & 8 deletions python/tests/unit/arcticdb/version_store/test_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ def test_fails_humongous_meta():
normalize_metadata(meta)


def test_fails_humongous_data():
norm = test_msgpack_normalizer
with pytest.raises(ArcticDbNotYetImplemented):
big = [1] * (norm.MMAP_DEFAULT_SIZE + 1)
norm.normalize(big)


def test_empty_df():
if IS_PANDAS_ZERO:
d = pd.DataFrame(data={"C": []}, index=pd.MultiIndex(levels=[["A"], ["B"]], labels=[[], []], names=["X", "Y"]))
Expand Down Expand Up @@ -346,12 +339,16 @@ def test_ndarray_arbitrary_shape():


def test_dict_with_tuples():
# This has to be pickeld because msgpack doesn't differentiate between tuples and lists
data = {(1, 2): [1, 24, 55]}
norm = CompositeNormalizer(use_norm_failure_handler_known_types=False, fallback_normalizer=test_msgpack_normalizer)
df, norm_meta = norm.normalize(data)
fd = FrameData.from_npd_df(df)
denormalized_data = norm.denormalize(fd, norm_meta)
assert denormalized_data == data
assert isinstance(denormalized_data, dict)
assert isinstance(next(iter(denormalized_data)), tuple)
assert isinstance(denormalized_data[(1, 2)], list)


def test_will_item_be_pickled(lmdb_version_store, sym):
Expand All @@ -371,7 +368,7 @@ def test_will_item_be_pickled(lmdb_version_store, sym):

def test_numpy_ts_col_with_none(lmdb_version_store):
df = pd.DataFrame(data={"a": [None, None]})
df["a"][0] = pd.Timestamp(0)
df.loc[0, "a"] = pd.Timestamp(0)
norm = CompositeNormalizer(use_norm_failure_handler_known_types=False, fallback_normalizer=test_msgpack_normalizer)
df, norm_meta = norm.normalize(df)
fd = FrameData.from_npd_df(df)
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ install_requires =
attrs
dataclasses ; python_version < '3.7'
protobuf >=3.5.0.post1 # Per https://github.com/grpc/grpc/blob/v1.45.3/requirements.txt
msgpack
msgpack >=0.5.0 # 0.5.0 is needed for strict_types argument, which is required for our pickling behaviour
pyyaml
packaging

Expand Down

0 comments on commit 597a28f

Please sign in to comment.