Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix #340: remove mmap buffer, require msgpack==0.5.0 #1433

Merged
merged 5 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion environment_unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ dependencies:
- werkzeug
- moto
- mock
- msgpack-python
# msgpack 0.5.0 is required for strict_types argument, needed for correct pickling fallback
- msgpack-python >= 0.5.0
# Pinned to avoid test disruption (in phase with the pin in setup.cfg)
# See: https://github.com/man-group/ArcticDB/pull/291
- hypothesis < 6.73
Expand Down
48 changes: 43 additions & 5 deletions python/arcticdb/_msgpack_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,53 @@

This module implements a backwards compatible version of msgpack functions.
"""

import msgpack
from arcticdb.preconditions import check
from arcticdb.exceptions import ArcticNativeException

ExtType = msgpack.ExtType


def _check_valid_msgpack():
pack_module = msgpack.packb.__module__
packer_module = msgpack.Packer.__module__
# Check that msgpack hasn't been monkey patched by another package
# We only support the official cmsgpack and fallback modules
if (pack_module in ("msgpack", "msgpack.fallback")) and (
packer_module in ("msgpack._packer", "msgpack.fallback", "msgpack._cmsgpack")
):
return
raise ArcticNativeException("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))


def packb(obj, **kwargs):
_check_valid_msgpack()
# use_bin_type supported from msgpack==0.4.0 but became true later
return msgpack.packb(obj, use_bin_type=True, strict_types=True, **kwargs)

def unpackb(*args, **kwargs):
packb.__doc__ = msgpack.packb.__doc__
packb.__name__ = msgpack.packb.__name__


def padded_packb(obj, **kwargs):
"""msgpack.packb with some defaults across msgpack versions and padded to 8 bytes
returns: (packed bytes, nbytes of unpadded content)"""
_check_valid_msgpack()
# use_bin_type is supported from msgpack==0.4.0 but became true later
# don't reset the buffer so we can append padding bytes
packer = msgpack.Packer(autoreset=False, use_bin_type=True, strict_types=True, **kwargs)
packer.pack(obj)
nbytes = packer.getbuffer().nbytes
pad = -nbytes % 8 # next multiple of 8 bytes
[packer.pack(None) for _ in range(pad)] # None is packed as single byte b`\xc0`
jamesmunro marked this conversation as resolved.
Show resolved Hide resolved
check(packer.getbuffer().nbytes % 8 == 0, 'Error in ArcticDB padded_packb. Padding failed. nbytes={}', packer.getbuffer().nbytes)
return packer.bytes(), nbytes


def unpackb(packed, **kwargs):
if msgpack.version >= (0, 6, 0):
kwargs.setdefault("strict_map_key", False)
return msgpack.unpackb(*args, **kwargs)

return msgpack.unpackb(packed, **kwargs)

unpackb.__doc__ = msgpack.unpackb.__doc__
unpackb.__name__ = msgpack.unpackb.__name__
unpackb.__name__ = msgpack.unpackb.__name__
159 changes: 37 additions & 122 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from pandas.api.types import is_integer_dtype
from arcticc.pb2.descriptors_pb2 import UserDefinedMetadata, NormalizationMetadata, MsgPackSerialization
from arcticc.pb2.storage_pb2 import VersionStoreConfig
from mmap import mmap
from collections import Counter
from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException
from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types
Expand All @@ -32,8 +31,8 @@
from pandas import DataFrame, MultiIndex, Series, DatetimeIndex, Index, RangeIndex
from typing import NamedTuple, List, Union, Mapping, Any, TypeVar, Tuple

from arcticdb import _msgpack_compat
from arcticdb.log import version
from arcticdb._msgpack_compat import packb, padded_packb, unpackb, ExtType
from arcticdb.log import version as log
from arcticdb.version_store._common import _column_name_to_strings, TimeFrame

PICKLE_PROTOCOL = 4
Expand All @@ -54,11 +53,6 @@ def check_is_utc_if_newer_pandas(*args, **kwargs):
return False # the UTC specific issue is not present in old Pandas so no need to go down special case


log = version

from msgpack import packb, unpackb, pack, ExtType


IS_WINDOWS = sys.platform == "win32"


Expand Down Expand Up @@ -278,7 +272,7 @@ def _to_tz_timestamp(dt):
microsecond=dt.microsecond,
).value
tz = dt.tzinfo.zone if dt.tzinfo is not None else None
return ts, tz
return [ts, tz]


def _from_tz_timestamp(ts, tz):
Expand Down Expand Up @@ -913,75 +907,22 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col
)


def check_valid_msgpack(pack_module, packer_module):
if (pack_module in ("msgpack", "msgpack.fallback")) and (
packer_module in ("msgpack._packer", "msgpack.fallback", "msgpack._cmsgpack")
):
return
log.info("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))
raise ArcticNativeException("Unsupported msgpack variant, got: {}, {}".format(pack_module, packer_module))


def _pack(*args, **kwargs):
from msgpack import Packer

check_valid_msgpack(pack.__module__, Packer.__module__)

return pack(*args, **kwargs)


def _packb(*args, **kwargs):
from msgpack import Packer

check_valid_msgpack(packb.__module__, Packer.__module__)

return packb(*args, **kwargs)


class MsgPackNormalizer(Normalizer):
"""
Fall back plan for the time being to store arbitrary data
"""

MSG_PACK_MAX_SIZE = (1 << 32) + 1024
MMAP_DEFAULT_SIZE = (
MSG_PACK_MAX_SIZE # Allow up to 4 gib pickles in msgpack by default, most of these compress fairly well.
)
# msgpack checks whether the size of pickled data within 1 << 32 - 1 byte only
# Extra memory is needed in mmap for msgpack's overhead

def __init__(self, cfg=None):
self._size = MsgPackNormalizer.MMAP_DEFAULT_SIZE if cfg is None else cfg.max_blob_size
self.MSG_PACK_MAX_SIZE = self._size # Override with the max_pickle size if set in config.
self.strict_mode = cfg.strict_mode if cfg is not None else False

def normalize(self, obj, **kwargs):
buffer = mmap(-1, self._size)
try:
return self._pack_with_buffer(obj, buffer)
except:
buffer.close()
raise

def _pack_with_buffer(self, obj, buffer: mmap):
try:
self._msgpack_pack(obj, buffer)
except ValueError as e:
if str(e) == "data out of range":
raise ArcticDbNotYetImplemented("Fallback normalized msgpack size cannot exceed {}B".format(self._size))
else:
raise
packed, nbytes = self._msgpack_padded_packb(obj)

norm_meta = NormalizationMetadata()
norm_meta.msg_pack_frame.version = 1

d, r = divmod(buffer.tell(), 8) # pack 8 by 8
size = d + int(r != 0)

norm_meta.msg_pack_frame.size_bytes = buffer.tell()
norm_meta.msg_pack_frame.size_bytes = nbytes

# FUTURE(#263): do we need to care about byte ordering?
column_val = np.array(memoryview(buffer[: size * 8]), np.uint8).view(np.uint64)
column_val = np.array(memoryview(packed), np.uint8).view(np.uint64)

return NormalizedInput(
item=NPDDataFrame(
Expand All @@ -1000,84 +941,57 @@ def denormalize(self, obj, meta):
raise ArcticNativeException("Expected msg_pack_frame input, actual {}".format(meta))
sb = meta.msg_pack_frame.size_bytes
col_data = obj.data[0].view(np.uint8)[:sb]
return self._msgpack_unpack(memoryview(col_data))

@staticmethod
def _nested_msgpack_packb(obj):
return _packb(obj, use_bin_type=True)

@staticmethod
def _nested_msgpack_unpackb(buff, raw=False):
return _msgpack_compat.unpackb(
buff,
raw=raw,
max_ext_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
max_bin_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
)
return self._msgpack_unpackb(memoryview(col_data))

def _custom_pack(self, obj):
if isinstance(obj, pd.Timestamp):
tz = obj.tz.zone if obj.tz is not None else None
return ExtType(MsgPackSerialization.PD_TIMESTAMP, MsgPackNormalizer._nested_msgpack_packb((obj.value, tz)))
return ExtType(MsgPackSerialization.PD_TIMESTAMP, packb([obj.value, tz]))

if isinstance(obj, datetime.datetime):
return ExtType(
MsgPackSerialization.PY_DATETIME, MsgPackNormalizer._nested_msgpack_packb(_to_tz_timestamp(obj))
)
return ExtType(MsgPackSerialization.PY_DATETIME, packb(_to_tz_timestamp(obj)))

if isinstance(obj, datetime.timedelta):
return ExtType(
MsgPackSerialization.PY_TIMEDELTA, MsgPackNormalizer._nested_msgpack_packb(pd.Timedelta(obj).value)
)
return ExtType(MsgPackSerialization.PY_TIMEDELTA, packb(pd.Timedelta(obj).value))

if self.strict_mode:
raise TypeError("Normalisation is running in strict mode, writing pickled data is disabled.")
else:
return ExtType(
MsgPackSerialization.PY_PICKLE_3,
MsgPackNormalizer._nested_msgpack_packb(Pickler.write(obj)),
)
return ExtType(MsgPackSerialization.PY_PICKLE_3, packb(Pickler.write(obj)))

def _ext_hook(self, code, data):
if code == MsgPackSerialization.PD_TIMESTAMP:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data, raw=False)
jamesmunro marked this conversation as resolved.
Show resolved Hide resolved
return pd.Timestamp(data[0], tz=data[1]) if data[1] is not None else pd.Timestamp(data[0])

if code == MsgPackSerialization.PY_DATETIME:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data, raw=False)
return _from_tz_timestamp(data[0], data[1])

if code == MsgPackSerialization.PY_TIMEDELTA:
data = MsgPackNormalizer._nested_msgpack_unpackb(data)
data = unpackb(data, raw=False)
return pd.Timedelta(data).to_pytimedelta()

if code == MsgPackSerialization.PY_PICKLE_2:
# If stored in Python2 we want to use raw while unpacking.
# https://github.com/msgpack/msgpack-python/blob/master/msgpack/_unpacker.pyx#L230
data = MsgPackNormalizer._nested_msgpack_unpackb(data, raw=True)
data = unpackb(data, raw=True)
return Pickler.read(data, pickled_in_python2=True)

if code == MsgPackSerialization.PY_PICKLE_3:
data = MsgPackNormalizer._nested_msgpack_unpackb(data, raw=False)
data = unpackb(data, raw=False)
return Pickler.read(data, pickled_in_python2=False)

return ExtType(code, data)

def _msgpack_pack(self, obj, buff):
try:
_pack(obj, buff, use_bin_type=True, default=self._custom_pack, strict_types=True)
except TypeError:
# Some ancient versions of msgpack don't support strict_types, fallback to the pack without that arg.
_pack(obj, buff, use_bin_type=True, default=self._custom_pack)

def _msgpack_unpack(self, buff, raw=False):
return _msgpack_compat.unpackb(
buff,
raw=raw,
ext_hook=self._ext_hook,
max_ext_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
max_bin_len=MsgPackNormalizer.MSG_PACK_MAX_SIZE,
)
def _msgpack_packb(self, obj):
return packb(obj, default=self._custom_pack)

def _msgpack_padded_packb(self, obj):
return padded_packb(obj, default=self._custom_pack)

def _msgpack_unpackb(self, buff, raw=False):
return unpackb(buff, raw=raw, ext_hook=self._ext_hook)


class Pickler(object):
Expand Down Expand Up @@ -1279,7 +1193,8 @@ def denormalize(self, item, norm_meta):
normalize = _NORMALIZER.normalize
denormalize = _NORMALIZER.denormalize

_MAX_USER_DEFINED_META = 16 << 20
_MAX_USER_DEFINED_META = 16 << 20 # 16MB
_WARN_USER_DEFINED_META = 8 << 20 # 8MB


def _init_msgpack_metadata():
Expand All @@ -1291,34 +1206,34 @@ def _init_msgpack_metadata():
_msgpack_metadata = _init_msgpack_metadata()


# TODO: Fix the calls to msgpack functions to make the code nicer
def normalize_metadata(d):
# type: (Mapping[string, Any])->NormalizationMetadata.UserDefinedMetadata
m = mmap(-1, _MAX_USER_DEFINED_META)
# this trick is used to prevent arbitrary large object serialization for now
# pretty defensive, but some ppl do stuff numpy arrays of 10^6 elements in there...
# it is going to slow down the whole indexing read side

# Prevent arbitrary large object serialization
# as it will slow down the indexing read side
# which is not a good idea.
# A subsequent improvement could remove that limitation
# using an extra indirection and point to the blob key
# However, this is also a probable sign of poor data modelling
# and understanding the need should be a priority before
# removing this protection.
try:
_msgpack_metadata._msgpack_pack(d, m)
except ValueError:
raise ArcticDbNotYetImplemented("User defined metadata cannot exceed {}B".format(_MAX_USER_DEFINED_META))
packed = _msgpack_metadata._msgpack_packb(d)
size = len(packed)
if size > _MAX_USER_DEFINED_META:
raise ArcticDbNotYetImplemented(f'User defined metadata cannot exceed {_MAX_USER_DEFINED_META}B')
if size > _WARN_USER_DEFINED_META:
log.warn(f'User defined metadata is above warning size ({_WARN_USER_DEFINED_META}B), metadata cannot exceed {_MAX_USER_DEFINED_META}B. Current size: {size}B.')

udm = UserDefinedMetadata()
udm.inline_payload = memoryview(m[: m.tell()]).tobytes()
udm.inline_payload = packed
return udm


def denormalize_user_metadata(udm, ext_obj=None):
# type: (NormalizationMetadata.UserDefinedMetadata, Optional[buffer])->Mapping[string,Any]
storage_type = udm.WhichOneof("storage_type")
if storage_type == "inline_payload":
return _msgpack_metadata._msgpack_unpack(udm.inline_payload)
return _msgpack_metadata._msgpack_unpackb(udm.inline_payload)
elif storage_type is None:
return None
else:
Expand Down
Loading
Loading