Skip to content

Commit

Permalink
Feature flag empty index in the python layer
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasil Pashov committed Apr 5, 2024
1 parent e6c4f28 commit 55c0716
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 17 deletions.
31 changes: 19 additions & 12 deletions python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def _ensure_str_timezone(index_tz):
return index_tz


def _denormalize_single_index(item, norm_meta):
def _denormalize_single_index(item, norm_meta, empty_types):
# item: np.ndarray -> pd.Index()
rtn = Index([])
if len(item.index_columns) == 0:
Expand All @@ -370,9 +370,9 @@ def _denormalize_single_index(item, norm_meta):
name = norm_meta.index.name if norm_meta.index.name else None
return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name)
else:
return Index([])
return None if empty_types else Index([])
else:
return Index([])
return RangeIndex(start=0, stop=0, step=1) if empty_types else Index([])
# this means that the index is not a datetime index and it's been represented as a regular field in the stream
item.index_columns.append(item.names.pop(0))

Expand Down Expand Up @@ -523,9 +523,10 @@ def denormalize(self, item, norm_meta):


class _PandasNormalizer(Normalizer):
def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len):
def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_types):
index = df.index
if len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0:
empty_index = len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0
if empty_index and empty_types:
index_norm = pd_norm.index
index_norm.is_physically_stored = False
index = Index([])
Expand All @@ -551,11 +552,15 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len):
index = df.index
else:
is_not_range_index = not isinstance(index, RangeIndex)
df_has_rows = not(len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0)
index_norm = pd_norm.index
index_norm.is_physically_stored = is_not_range_index and df_has_rows
if not df_has_rows:
index = Index([])
if empty_types:
index_norm.is_physically_stored = is_not_range_index and not empty_index
if empty_index:
index = Index([])
else:
if IS_PANDAS_TWO and isinstance(index, RangeIndex) and empty_index:
index = DatetimeIndex([])
index_norm.is_physically_stored = not isinstance(index, RangeIndex)

return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len)

Expand Down Expand Up @@ -843,7 +848,7 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata.

return df

def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
# type: (DataFrame, Optional[int])->NormalizedInput
norm_meta = NormalizationMetadata()
norm_meta.df.common.mark = True
Expand Down Expand Up @@ -1173,7 +1178,7 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type
self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization
self.fallback_normalizer = fallback_normalizer

def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs):
def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
normalizer = self.get_normalizer_for_type(item)

if not normalizer:
Expand All @@ -1185,6 +1190,7 @@ def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_co
string_max_len=string_max_len,
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
empty_types=empty_types,
**kwargs,
)

Expand Down Expand Up @@ -1222,7 +1228,7 @@ def get_normalizer_for_type(self, item):
return None

def normalize(
self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, **kwargs
self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
):
"""
:param item: Item to be normalized to something Arctic Native understands.
Expand All @@ -1238,6 +1244,7 @@ def normalize(
string_max_len=string_max_len,
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
empty_types=empty_types,
**kwargs,
)
except Exception as ex:
Expand Down
46 changes: 42 additions & 4 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def _try_normalize(
pickle_on_failure,
dynamic_strings,
coerce_columns,
empty_types,
norm_failure_options_msg="",
**kwargs
):
Expand All @@ -347,6 +348,7 @@ def _try_normalize(
dynamic_strings=dynamic_strings,
coerce_columns=coerce_columns,
dynamic_schema=dynamic_schema,
empty_types=empty_types,
**kwargs,
)
except ArcticDbNotYetImplemented as ex:
Expand All @@ -371,7 +373,7 @@ def _try_normalize(
return udm, item, norm_meta

def _try_flatten_and_write_composite_object(
self, symbol, data, metadata, pickle_on_failure, dynamic_strings, prune_previous
self, symbol, data, metadata, pickle_on_failure, dynamic_strings, prune_previous, empty_types
):
fl = Flattener()
if fl.can_flatten(data):
Expand All @@ -383,7 +385,15 @@ def _try_flatten_and_write_composite_object(
# need to go through a multi key process as the msgpack normalizer can handle it as is.
if len(to_write) > 0:
for k, v in to_write.items():
_, item, norm_meta = self._try_normalize(k, v, None, pickle_on_failure, dynamic_strings, None)
_, item, norm_meta = self._try_normalize(
k,
v,
None,
pickle_on_failure,
dynamic_strings,
None,
empty_types
)
items.append(item)
norm_metas.append(norm_meta)
normalized_udm = normalize_metadata(metadata) if metadata is not None else None
Expand Down Expand Up @@ -538,6 +548,9 @@ def write(
recursive_normalizers = self.resolve_defaults(
"recursive_normalizers", proto_cfg, global_default=False, uppercase=False, **kwargs
)
empty_types = self.resolve_defaults(
"empty_types", proto_cfg, global_default=False, uppercase=False, **kwargs
)
parallel = self.resolve_defaults("parallel", proto_cfg, global_default=False, uppercase=False, **kwargs)
incomplete = self.resolve_defaults("incomplete", proto_cfg, global_default=False, uppercase=False, **kwargs)

Expand All @@ -561,7 +574,13 @@ def write(
# Do a multi_key write if the structured is nested and is not trivially normalizable via msgpack.
if recursive_normalizers:
vit = self._try_flatten_and_write_composite_object(
symbol, data, metadata, pickle_on_failure, dynamic_strings, prune_previous_version
symbol,
data,
metadata,
pickle_on_failure,
dynamic_strings,
prune_previous_version,
empty_types
)
if isinstance(vit, VersionedItem):
return vit
Expand All @@ -573,6 +592,7 @@ def write(
pickle_on_failure,
dynamic_strings,
coerce_columns,
empty_types,
norm_failure_options_msg,
)
# TODO: allow_sparse for write_parallel / recursive normalizers as well.
Expand Down Expand Up @@ -685,7 +705,9 @@ def append(

dynamic_strings = self._resolve_dynamic_strings(kwargs)
coerce_columns = kwargs.get("coerce_columns", None)

empty_types = self.resolve_defaults(
"empty_types", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
)
_handle_categorical_columns(symbol, dataframe)

udm, item, norm_meta = self._try_normalize(
Expand All @@ -695,6 +717,7 @@ def append(
False,
dynamic_strings,
coerce_columns,
empty_types,
self.norm_failure_options_msg_append,
)

Expand Down Expand Up @@ -783,6 +806,9 @@ def update(
dynamic_schema = self.resolve_defaults(
"dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
)
empty_types = self.resolve_defaults(
"empty_types", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
)
coerce_columns = kwargs.get("coerce_columns", None)

if date_range is not None:
Expand All @@ -799,6 +825,7 @@ def update(
False,
dynamic_strings,
coerce_columns,
empty_types,
self.norm_failure_options_msg_update,
)

Expand Down Expand Up @@ -1235,6 +1262,9 @@ def _batch_write_internal(
pickle_on_failure = self.resolve_defaults(
"pickle_on_failure", proto_cfg, global_default=False, existing_value=pickle_on_failure, **kwargs
)
empty_types = self.resolve_defaults(
"empty_types", proto_cfg, global_default=False, existing_value=proto_cfg.empty_types, **kwargs
)
norm_failure_options_msg = kwargs.get("norm_failure_options_msg", self.norm_failure_options_msg_write)

# metadata_vector used to be type-hinted as an Iterable, so handle this case in case anyone is relying on it
Expand All @@ -1254,6 +1284,7 @@ def _batch_write_internal(
pickle_on_failure,
dynamic_strings,
None,
empty_types,
norm_failure_options_msg,
)
for idx in range(len(symbols))
Expand Down Expand Up @@ -1396,6 +1427,9 @@ def _batch_append_to_versioned_items(
prune_previous_version = self.resolve_defaults(
"prune_previous_version", proto_cfg, global_default=False, existing_value=prune_previous_version
)
empty_types = self.resolve_defaults(
"empty_types", proto_cfg, global_default=False, existing_value=proto_cfg.empty_types
)
dynamic_strings = self._resolve_dynamic_strings(kwargs)

# metadata_vector used to be type-hinted as an Iterable, so handle this case in case anyone is relying on it
Expand All @@ -1415,6 +1449,7 @@ def _batch_append_to_versioned_items(
False,
dynamic_strings,
None,
empty_types,
self.norm_failure_options_msg_append,
)
for idx in range(len(symbols))
Expand Down Expand Up @@ -2392,13 +2427,16 @@ def get_normalizer_for_item(self, item):

def will_item_be_pickled(self, item):
try:
proto_cfg = self._lib_cfg.lib_desc.version.write_options
empty_types = self.resolve_defaults("empty_types", proto_cfg, global_default=False, uppercase=False)
_udm, _item, norm_meta = self._try_normalize(
symbol="",
dataframe=item,
metadata=None,
pickle_on_failure=False,
dynamic_strings=False, # TODO: Enable it when on by default.
coerce_columns=None,
empty_types=empty_types
)
except Exception:
# This will also log the exception inside composite normalizer's normalize
Expand Down
5 changes: 4 additions & 1 deletion python/tests/unit/arcticdb/version_store/test_deletion.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ def write_specific_version(data, version):
proto_cfg = lib._lib_cfg.lib_desc.version.write_options
dynamic_strings = lib.resolve_defaults("dynamic_strings", proto_cfg, False)
pickle_on_failure = lib.resolve_defaults("pickle_on_failure", proto_cfg, False)
udm, item, norm_meta = lib._try_normalize(sym, data, None, pickle_on_failure, dynamic_strings, None)
empty_types = lib.resolve_defaults("empty_types", proto_cfg, False)
udm, item, norm_meta = lib._try_normalize(
sym, data, None, pickle_on_failure, dynamic_strings, None, empty_types
)
if isinstance(item, NPDDataFrame):
lib.version_store.write_dataframe_specific_version(sym, item, norm_meta, udm, version)

Expand Down

0 comments on commit 55c0716

Please sign in to comment.