diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp index f1a54fd8d5..fce30d3020 100644 --- a/cpp/arcticdb/python/python_to_tensor_frame.cpp +++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp @@ -116,7 +116,7 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) { const auto arr = pybind11::detail::array_proxy(ptr); const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr); auto ndim = arr->nd; - ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1]; + const ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1]; // In Pandas < 2, empty series dtype is `"float"`, but as of Pandas 2.0, empty series dtype is `"object"` // The Normalizer in Python cast empty `"float"` series to `"object"` so `EMPTY` is used here. // See: https://github.com/man-group/ArcticDB/pull/1049 @@ -264,17 +264,16 @@ std::shared_ptr py_ndf_to_frame( // idx_names are passed by the python layer. They are empty in case row count index is used see: // https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291 - // Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify + // Currently the python layers assign RowRange index to both empty dataframes and dataframes which do not specify // index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there. if (idx_names.empty()) { - if (res->num_rows > 0) { - res->index = stream::RowCountIndex(); - res->desc.set_index_type(IndexDescriptor::ROWCOUNT); - } else { - res->index = stream::EmptyIndex(); - res->desc.set_index_type(IndexDescriptor::EMPTY); - } + res->index = stream::RowCountIndex(); + res->desc.set_index_type(IndexDescriptor::ROWCOUNT); + } + if (empty_types && res->num_rows == 0) { + res->index = stream::EmptyIndex(); + res->desc.set_index_type(IndexDescriptor::EMPTY); } ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc); diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 244b7c8fae..c88a3e3908 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -587,7 +587,8 @@ VersionedItem LocalVersionedEngine::update_internal( query, frame, get_write_options(), - dynamic_schema); + dynamic_schema, + cfg().write_options().empty_types()); write_version_and_prune_previous( prune_previous_versions, versioned_item.key_, update_info.previous_index_key_); return versioned_item; @@ -1398,7 +1399,8 @@ VersionedItem LocalVersionedEngine::append_internal( update_info, frame, get_write_options(), - validate_index); + validate_index, + cfg().write_options().empty_types()); write_version_and_prune_previous( prune_previous_versions, versioned_item.key_, update_info.previous_index_key_); return versioned_item; @@ -1446,7 +1448,7 @@ std::vector> LocalVersionedEngine::batch_ auto index_key_fut = folly::Future::makeEmpty(); auto write_options = get_write_options(); if (update_info.previous_index_key_.has_value()) { - index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index); + index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index, cfg().write_options().empty_types()); } else { missing_data::check( upsert, diff --git a/cpp/arcticdb/version/schema_checks.hpp b/cpp/arcticdb/version/schema_checks.hpp index 004bd14b3a..9b81d54058 100644 --- a/cpp/arcticdb/version/schema_checks.hpp +++ b/cpp/arcticdb/version/schema_checks.hpp @@ -42,42 +42,68 @@ inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& return IndexDescriptor::UNKNOWN; } -inline void check_normalization_index_match(NormalizationOperation operation, - const StreamDescriptor &old_descriptor, - const pipelines::InputTensorFrame &frame) { +inline void check_normalization_index_match( + NormalizationOperation operation, + const StreamDescriptor& old_descriptor, + const pipelines::InputTensorFrame& frame, + bool empty_types +) { const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type(); const IndexDescriptor::Type new_idx_kind = frame.desc.index().type(); if (operation == UPDATE) { const bool new_is_timeseries = std::holds_alternative(frame.index); util::check_rte( (old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries, - "Update will not work as expected with a non-timeseries index"); + "Update will not work as expected with a non-timeseries index" + ); } else { const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind); - normalization::check( - common_index_type != IndexDescriptor::UNKNOWN, - "Cannot append {} index to {} index", - index_type_to_str(new_idx_kind), - index_type_to_str(old_idx_kind) - ); + if (empty_types) { + normalization::check( + common_index_type != IndexDescriptor::UNKNOWN, + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } else { + // (old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT) is left to preserve + // pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series + // have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545 + // it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index + // in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series. + // Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would + // have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better + // after we enable the empty index. + normalization::check( + common_index_type != IndexDescriptor::UNKNOWN || + (old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT), + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } } } -inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) { - const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0; +inline bool columns_match( + const StreamDescriptor& df_in_store_descriptor, + const StreamDescriptor& new_df_descriptor +) { + const int index_field_size = + df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0; // The empty index is compatible with all other index types. Differences in the index fields in this case is // allowed. The index fields are always the first in the list. - if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) { + if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) { return false; } // In case the left index is empty index we want to skip name/type checking of the index fields which are always // the first fields. for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) { - if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + right_fields_offset).name()) + if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name()) return false; const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type(); - const TypeDescriptor& right_type = new_df_descriptor.fields(i + right_fields_offset).type(); + const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type(); if (!trivially_compatible_types(left_type, right_type) && !(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type()))) @@ -90,9 +116,10 @@ inline void fix_descriptor_mismatch_or_throw( NormalizationOperation operation, bool dynamic_schema, const pipelines::index::IndexSegmentReader &existing_isr, - const pipelines::InputTensorFrame &new_frame) { + const pipelines::InputTensorFrame &new_frame, + bool empty_types) { const auto &old_sd = existing_isr.tsd().as_stream_descriptor(); - check_normalization_index_match(operation, old_sd, new_frame); + check_normalization_index_match(operation, old_sd, new_frame, empty_types); if (dynamic_schema) return; // TODO: dynamic schema may need some of the checks as below diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 6518105283..f27a53b830 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -137,7 +137,8 @@ folly::Future async_append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index) { + bool validate_index, + bool empty_types) { util::check(update_info.previous_index_key_.has_value(), "Cannot append as there is no previous index key to append to"); const StreamId stream_id = frame->desc.id(); @@ -147,7 +148,7 @@ folly::Future async_append_impl( auto row_offset = index_segment_reader.tsd().proto().total_rows(); util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data"); frame->set_offset(static_cast(row_offset)); - fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame); + fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame, empty_types); if (validate_index) { sorted_data_check_append(*frame, index_segment_reader); } @@ -162,14 +163,16 @@ VersionedItem append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index) { + bool validate_index, + bool empty_types) { ARCTICDB_SUBSAMPLE_DEFAULT(WaitForWriteCompletion) auto version_key_fut = async_append_impl(store, update_info, frame, options, - validate_index); + validate_index, + empty_types); auto version_key = std::move(version_key_fut).get(); auto versioned_item = VersionedItem(to_atom(std::move(version_key))); ARCTICDB_DEBUG(log::version(), "write_dataframe_impl stream_id: {} , version_id: {}", versioned_item.symbol(), update_info.next_version_id_); @@ -323,7 +326,8 @@ VersionedItem update_impl( const UpdateQuery& query, const std::shared_ptr& frame, const WriteOptions&& options, - bool dynamic_schema) { + bool dynamic_schema, + bool empty_types) { util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into"); const StreamId stream_id = frame->desc.id(); ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id()); @@ -337,7 +341,7 @@ VersionedItem update_impl( sorted_data_check_update(*frame, index_segment_reader); bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic); - fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame); + fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types); std::vector> queries = build_update_query_filters(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic()); diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index c7d488b2a2..9247c61136 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -54,14 +54,16 @@ folly::Future async_append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index); + bool validate_index, + bool empty_types); VersionedItem append_impl( const std::shared_ptr& store, const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index); + bool validate_index, + bool empty_types); VersionedItem update_impl( const std::shared_ptr& store, @@ -69,7 +71,8 @@ VersionedItem update_impl( const UpdateQuery & query, const std::shared_ptr& frame, const WriteOptions&& options, - bool dynamic_schema); + bool dynamic_schema, + bool empty_types); VersionedItem delete_range_impl( const std::shared_ptr& store, diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index 63c7425395..38879f747e 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -283,10 +283,13 @@ def _from_tz_timestamp(ts, tz): _range_index_props_are_public = hasattr(RangeIndex, "start") -def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None): +def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False): # index: pd.Index or np.ndarray -> np.ndarray index_tz = None - if isinstance(index_norm, NormalizationMetadata.PandasIndex) and not index_norm.is_physically_stored: + is_empty = len(index) == 0 + if empty_types and is_empty and not index_norm.is_physically_stored: + return [], [] + elif isinstance(index, RangeIndex): if index.name: if not isinstance(index.name, int) and not isinstance(index.name, str): raise NormalizationException( @@ -295,10 +298,8 @@ def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None if isinstance(index.name, int): index_norm.is_int = True index_norm.name = str(index.name) - if isinstance(index, RangeIndex): - # skip index since we can reconstruct it, so no need to actually store it - index_norm.start = index.start if _range_index_props_are_public else index._start - index_norm.step = index.step if _range_index_props_are_public else index._step + index_norm.start = index.start if _range_index_props_are_public else index._start + index_norm.step = index.step if _range_index_props_are_public else index._step return [], [] else: coerce_type = DTN64_DTYPE if len(index) == 0 else None @@ -364,9 +365,9 @@ def _denormalize_single_index(item, norm_meta): name = norm_meta.index.name if norm_meta.index.name else None return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name) else: - return Index([]) + return DatetimeIndex([]) else: - return Index([]) + return RangeIndex(start=0, stop=0, step=1) # this means that the index is not a datetime index and it's been represented as a regular field in the stream item.index_columns.append(item.names.pop(0)) @@ -517,12 +518,13 @@ def denormalize(self, item, norm_meta): class _PandasNormalizer(Normalizer): - def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): + def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_types): index = df.index - if len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0: + empty_df = len(index) == 0 + if empty_df and empty_types: index_norm = pd_norm.index index_norm.is_physically_stored = False - index = Index([]) + index = DatetimeIndex([]) elif isinstance(index, MultiIndex): # This is suboptimal and only a first implementation since it reduplicates the data index_norm = pd_norm.multi_index @@ -544,14 +546,12 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): df.reset_index(fields, inplace=True) index = df.index else: - is_not_range_index = not isinstance(index, RangeIndex) - df_has_rows = not(len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0) index_norm = pd_norm.index - index_norm.is_physically_stored = is_not_range_index and df_has_rows - if not df_has_rows: - index = Index([]) + index_norm.is_physically_stored = not isinstance(index, RangeIndex) and not empty_df + is_categorical = len(df.select_dtypes(include="category").columns) > 0 + index = DatetimeIndex([]) if IS_PANDAS_TWO and empty_df and not is_categorical else index - return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len) + return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types) def _index_from_records(self, item, norm_meta): # type: (NormalizationMetadata.Pandas, _SUPPORTED_NATIVE_RETURN_TYPES, Bool)->Union[Index, DatetimeIndex, MultiIndex] @@ -581,12 +581,13 @@ class SeriesNormalizer(_PandasNormalizer): def __init__(self): self._df_norm = DataFrameNormalizer() - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): df, norm = self._df_norm.normalize( item.to_frame(), dynamic_strings=dynamic_strings, string_max_len=string_max_len, coerce_columns=coerce_columns, + empty_types=empty_types ) norm.series.CopyFrom(norm.df) if item.name: @@ -837,7 +838,7 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata. return df - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): # type: (DataFrame, Optional[int])->NormalizedInput norm_meta = NormalizationMetadata() norm_meta.df.common.mark = True @@ -853,7 +854,7 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col raise ArcticDbNotYetImplemented("MultiIndex column are not supported yet") index_names, ix_vals = self._index_to_records( - item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len + item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len, empty_types=empty_types ) # The first branch of this if is faster, but does not work with null/duplicated column names if item.columns.is_unique and not item.columns.hasnans: @@ -1009,13 +1010,13 @@ def write(obj): class TimeFrameNormalizer(Normalizer): - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): norm_meta = NormalizationMetadata() norm_meta.ts.mark = True index_norm = norm_meta.ts.common.index index_norm.is_physically_stored = len(item.times) > 0 and not isinstance(item.times, RangeIndex) index_names, ix_vals = _normalize_single_index( - item.times, ["times"], index_norm, dynamic_strings, string_max_len + item.times, ["times"], index_norm, dynamic_strings, string_max_len, empty_types=empty_types ) columns_names, columns_vals = _normalize_columns( item.columns_names, @@ -1087,7 +1088,7 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization self.fallback_normalizer = fallback_normalizer - def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): normalizer = self.get_normalizer_for_type(item) if not normalizer: @@ -1099,6 +1100,7 @@ def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_co string_max_len=string_max_len, dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, + empty_types=empty_types, **kwargs, ) @@ -1136,7 +1138,7 @@ def get_normalizer_for_type(self, item): return None def normalize( - self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, **kwargs + self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs ): """ :param item: Item to be normalized to something Arctic Native understands. @@ -1145,13 +1147,13 @@ def normalize( :param pickle_on_failure: This will fallback to pickling the Supported objects (DataFrame, Series, TimeFrame) even if use_norm_failure_handler_known_types was not configured at the library level. """ - try: return self._normalize( item, string_max_len=string_max_len, dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, + empty_types=empty_types, **kwargs, ) except Exception as ex: diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 68abc3571e..c0fc92adcf 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -332,6 +332,7 @@ def _try_normalize( dynamic_schema = self.resolve_defaults( "dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs ) + empty_types = self.resolve_defaults("empty_types", self._lib_cfg.lib_desc.version.write_options, False) try: udm = normalize_metadata(metadata) if metadata is not None else None opt_custom = self._custom_normalizer.normalize(dataframe) @@ -347,6 +348,7 @@ def _try_normalize( dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, dynamic_schema=dynamic_schema, + empty_types=empty_types, **kwargs, ) except ArcticDbNotYetImplemented as ex: diff --git a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py index b0311cdbdd..d15a406e36 100644 --- a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py +++ b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py @@ -11,6 +11,9 @@ from pandas.testing import assert_frame_equal import numpy as np import pytest +from packaging.version import Version +from arcticdb.util._versions import PANDAS_VERSION +import arcticdb class DtypeGenerator: """ @@ -584,6 +587,7 @@ def append_index(self, request): @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) + assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([])) yield def test_integer(self, lmdb_version_store_static_and_dynamic, int_dtype, dtype, append_index): @@ -744,6 +748,7 @@ class TestCanUpdateEmptyColumn: @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) + assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([])) yield def update_index(self): @@ -806,14 +811,187 @@ def test_cannot_append_different_type_after_first_not_none(self, lmdb_version_st lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": ["some", "string"]})) @pytest.mark.parametrize( - "incompatible_indexes", + "index, incompatible_index", [ (pd.RangeIndex(0,3), list(pd.date_range(start="1/1/2024", end="1/3/2024"))), (list(pd.date_range(start="1/1/2024", end="1/3/2024")), pd.RangeIndex(0, 3)) ] ) - def test_cannot_append_different_index_type_after_first_non_empty(self, lmdb_version_store_static_and_dynamic, incompatible_indexes): + def test_cannot_append_different_index_type_after_first_non_empty(self, lmdb_version_store_static_and_dynamic, index, incompatible_index): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []})) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [1,2,3]}, index=incompatible_indexes[0])) + assert lmdb_version_store_static_and_dynamic.read("sym").data.index.equals(pd.DatetimeIndex([])) + df_to_append_successfuly = pd.DataFrame({"col": [1,2,3]}, index=index) + lmdb_version_store_static_and_dynamic.append("sym",df_to_append_successfuly , validate_index=False) + assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append_successfuly) with pytest.raises(Exception): - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [4, 5, 6]}, index=incompatible_indexes[1])) \ No newline at end of file + lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [4, 5, 6]}, index=incompatible_index)) + + +class DisabledEmptyIndexBase: + @classmethod + def sym(cls): + return "sym" + + @classmethod + def roundtrip(cls, dataframe, storage): + storage.write(cls.sym(), dataframe) + return storage.read(cls.sym()).data + + @classmethod + def is_dynamic_schema(cls, storage): + return storage.lib_cfg().lib_desc.version.write_options.dynamic_schema + + @pytest.fixture( + scope="function", + params=( + "lmdb_version_store_v1", + "lmdb_version_store_v2", + "lmdb_version_store_dynamic_schema_v1", + "lmdb_version_store_dynamic_schema_v2", + ), + ) + def lmdb_version_store_static_and_dynamic(self, request): + yield request.getfixturevalue(request.param) + +@pytest.mark.skipif(PANDAS_VERSION < Version("2.0.0"), reason="This tests behavior of Pandas 2 and grater.") +class TestIndexTypeWithEmptyTypeDisabledPands2AndLater(DisabledEmptyIndexBase): + + def test_no_cols(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame([]), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + def test_has_a_column(self, lmdb_version_store_static_and_dynamic): + if self.is_dynamic_schema(lmdb_version_store_static_and_dynamic): + pytest.xfail( + """In case of empty symbols dynamic schema allows appending indexes of different types. + See https://github.com/man-group/ArcticDB/issues/1507. This is fixed with the empty index type.""" + ) + result = self.roundtrip(pd.DataFrame({"a": []}), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + with pytest.raises(Exception): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": [1.0]})) + to_append_successfuly = pd.DataFrame({"a": [1.0]}, index=pd.DatetimeIndex(["01/01/2024"])) + lmdb_version_store_static_and_dynamic.append(self.sym(), to_append_successfuly) + assert_frame_equal( + lmdb_version_store_static_and_dynamic.read(self.sym()).data, + to_append_successfuly + ) + + def test_explicit_row_range_no_columns(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame([], index=pd.RangeIndex(start=5, stop=5, step=100)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + def test_explicit_row_range_with_columns(self, lmdb_version_store_static_and_dynamic): + if self.is_dynamic_schema(lmdb_version_store_static_and_dynamic): + pytest.xfail( + """In case of empty symbols dynamic schema allows appending indexes of different types. + See https://github.com/man-group/ArcticDB/issues/1507. This is fixed with the empty index type.""" + ) + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.RangeIndex(start=5, stop=5, step=100)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + with pytest.raises(Exception): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": [1.0]}, pd.RangeIndex(start=0, stop=1, step=1))) + to_append_successfuly = pd.DataFrame({"a": [1.0]}, index=pd.DatetimeIndex(["01/01/2024"])) + lmdb_version_store_static_and_dynamic.append(self.sym(), to_append_successfuly) + assert_frame_equal( + lmdb_version_store_static_and_dynamic.read(self.sym()).data, + to_append_successfuly + ) + + def test_explicit_rowrange_default_step(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.RangeIndex(start=0, stop=0)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + def test_explicit_datetime(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.DatetimeIndex([])), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + @pytest.mark.parametrize("arrays, expected_arrays", [ + ([[], []], ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="object")])), + ([np.array([], dtype="float"), np.array([], dtype="int")], ([np.array([], dtype="float"), np.array([], dtype="int")])), + ([np.array([], dtype="int"), np.array([], dtype="float")], ([np.array([], dtype="int"), np.array([], "float")])), + ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="float64")], ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="float")])) + ]) + def test_multiindex(self, lmdb_version_store_static_and_dynamic, arrays, expected_arrays): + # When multiindex is used the dtypes are preserved. In case default empty numpy arrays are used like so: + # pd.MultiIndex.from_arrays([np.array([]), np.array([])], names=["p", "s"]) the result varies depending on + # numpy's defaults + input_index = pd.MultiIndex.from_arrays(arrays, names=["p", "s"]) + result = self.roundtrip(pd.DataFrame({"a": []}, index=input_index), lmdb_version_store_static_and_dynamic) + expected_multiindex = pd.MultiIndex.from_arrays(expected_arrays, names=["p", "s"]) + assert result.index.equals(expected_multiindex) + + +@pytest.mark.skipif(PANDAS_VERSION >= Version("2.0.0"), reason="This tests only the behavior with Pandas <= 2") +class TestIndexTypeWithEmptyTypeDisabledPands0AndPands1(DisabledEmptyIndexBase): + @classmethod + def multiindex_dtypes(cls, index): + """ + The MultiIndex class in Pandas < 2 does not have dtypes method. This emulates that + """ + return pd.Series({name: level.dtype for name, level in zip(index.names, index.levels)}) + + def test_no_cols(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame([]), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + def test_has_a_column(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame({"a": []}), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.RangeIndex(start=0, stop=0, step=1)) + with pytest.raises(arcticdb.exceptions.NormalizationException): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": ["a"]}, index=pd.DatetimeIndex(["01/01/2024"]))) + to_append_successfuly = pd.DataFrame({"a": ["a"]}) + lmdb_version_store_static_and_dynamic.append(self.sym(), to_append_successfuly) + assert_frame_equal(lmdb_version_store_static_and_dynamic.read(self.sym()).data, to_append_successfuly) + + def test_explicit_row_range_no_columns(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame([], index=pd.RangeIndex(start=5, stop=5, step=100)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.RangeIndex(start=0, stop=0, step=1)) + + def test_explicit_row_range_with_columns(self, lmdb_version_store_static_and_dynamic): + if self.is_dynamic_schema(lmdb_version_store_static_and_dynamic): + pytest.xfail( + """Dynamic schema should not allow appending missmatching range indexes. See issue: + https://github.com/man-group/ArcticDB/issues/1509""" + ) + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.RangeIndex(start=5, stop=5, step=100)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.RangeIndex(start=5, stop=5, step=100)) + # Cannot append datetime indexed df to empty rowrange index + with pytest.raises(arcticdb.exceptions.NormalizationException): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": ["a"]}, index=pd.DatetimeIndex(["01/01/2024"]))) + # Cannot append rowrange indexed df if the start of the appended is not matching the stop of the empty + with pytest.raises(arcticdb.exceptions.NormalizationException): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": ["a"]}, index=pd.RangeIndex(start=9, stop=109, step=100))) + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": ["a"]}, index=pd.RangeIndex(start=10, stop=110, step=100))) + # Cannot append rowrange indexed df if the step is different + with pytest.raises(arcticdb.exceptions.NormalizationException): + lmdb_version_store_static_and_dynamic.append(self.sym(), pd.DataFrame({"a": ["a"]}, index=pd.RangeIndex(start=5, stop=6, step=1))) + to_append_successfuly = pd.DataFrame({"a": ["a"]}, index=pd.RangeIndex(start=5, stop=105, step=100)) + lmdb_version_store_static_and_dynamic.append(self.sym(), to_append_successfuly) + assert_frame_equal( + lmdb_version_store_static_and_dynamic.read(self.sym()).data, + to_append_successfuly + ) + + def test_explicit_rowrange_default_step(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.RangeIndex(start=0, stop=0)), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.RangeIndex(start=0, stop=0, step=1)) + + def test_explicit_datetime(self, lmdb_version_store_static_and_dynamic): + result = self.roundtrip(pd.DataFrame({"a": []}, index=pd.DatetimeIndex([])), lmdb_version_store_static_and_dynamic) + assert result.index.equals(pd.DatetimeIndex([])) + + @pytest.mark.parametrize("arrays, expected_arrays", [ + ([[], []], ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="object")])), + ([np.array([], dtype="float"), np.array([], dtype="int")], ([np.array([], dtype="object"), np.array([], dtype="int")])), + ([np.array([], dtype="int"), np.array([], dtype="float")], ([np.array([], dtype="int"), np.array([], "object")])), + ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="float64")], ([np.array([], dtype="datetime64[ns]"), np.array([], dtype="object")])) + ]) + def test_multiindex(self, lmdb_version_store_static_and_dynamic, arrays, expected_arrays): + # When multiindex is used the dtypes are preserved. In case default empty numpy arrays are used like so: + # pd.MultiIndex.from_arrays([np.array([]), np.array([])], names=["p", "s"]) the result varies depending on + # numpy's defaults + input_index = pd.MultiIndex.from_arrays(arrays, names=["p", "s"]) + result = self.roundtrip(pd.DataFrame({"a": []}, index=input_index), lmdb_version_store_static_and_dynamic) + expected_multiindex = pd.MultiIndex.from_arrays(expected_arrays, names=["p", "s"]) + assert result.index.equals(expected_multiindex) diff --git a/python/tests/unit/arcticdb/version_store/test_empty_writes.py b/python/tests/unit/arcticdb/version_store/test_empty_writes.py index 67bb439fbe..dd07071321 100644 --- a/python/tests/unit/arcticdb/version_store/test_empty_writes.py +++ b/python/tests/unit/arcticdb/version_store/test_empty_writes.py @@ -146,33 +146,20 @@ def test_empty_series(lmdb_version_store_dynamic_schema, sym): assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=False) -@pytest.mark.parametrize("dtype, existing_empty, append_empty", [ - ("int64", True, False), - ("float64", True, False), - ("float64", False, True), - ("float64", True, True), +@pytest.mark.parametrize("dtype, series, append_series", [ + ("int64", pd.Series([]), pd.Series([1, 2, 3], dtype="int64")), + ("float64", pd.Series([]), pd.Series([1, 2, 3], dtype="float64")), + ("float64", pd.Series([1, 2, 3], dtype="float64"), pd.Series([])), + ("float64", pd.Series([]), pd.Series([])), ]) -def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, existing_empty, append_empty): - empty_ser = pd.Series([]) - non_empty_ser = pd.Series([1, 2, 3], dtype=dtype) - if existing_empty: - ser = empty_ser - else: - ser = non_empty_ser - lmdb_version_store_dynamic_schema.write(sym, ser) +def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, series, append_series): + lmdb_version_store_dynamic_schema.write(sym, series) assert not lmdb_version_store_dynamic_schema.is_symbol_pickled(sym) - # ArcticDB stores empty columns under a dedicated `EMPTYVAL` type, so the types are not going to match with pandas # if the series is empty. - assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=(len(ser) > 0)) - - if append_empty: - new_ser = empty_ser - else: - new_ser = non_empty_ser - lmdb_version_store_dynamic_schema.append(sym, new_ser) - - result_ser = pd.concat([ser, new_ser]) + assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, series, check_index_type=(len(series) > 0)) + lmdb_version_store_dynamic_schema.append(sym, append_series) + result_ser = pd.concat([series, append_series]) assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, result_ser, check_index_type=(len(result_ser) > 0))