From 2effe0c322ab83f3fbc68fb28b4dcbac5c564c8b Mon Sep 17 00:00:00 2001 From: Vasil Pashov Date: Wed, 10 Apr 2024 10:10:53 +0300 Subject: [PATCH] Feature flag the empty type using library config --- .../python/python_to_tensor_frame.cpp | 2 +- .../version/local_versioned_engine.cpp | 8 +-- cpp/arcticdb/version/schema_checks.hpp | 51 +++++++++++++------ cpp/arcticdb/version/version_core.cpp | 16 +++--- cpp/arcticdb/version/version_core.hpp | 9 ++-- .../arcticdb/version_store/_normalization.py | 31 +++++------ python/arcticdb/version_store/_store.py | 4 ++ .../version_store/test_empty_writes.py | 33 ++++-------- 8 files changed, 88 insertions(+), 66 deletions(-) diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp index f1a54fd8d5..8017f3b9b4 100644 --- a/cpp/arcticdb/python/python_to_tensor_frame.cpp +++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp @@ -267,7 +267,7 @@ std::shared_ptr py_ndf_to_frame( // Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify // index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there. if (idx_names.empty()) { - if (res->num_rows > 0) { + if (!empty_types || res->num_rows > 0) { res->index = stream::RowCountIndex(); res->desc.set_index_type(IndexDescriptor::ROWCOUNT); } else { diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 244b7c8fae..c88a3e3908 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -587,7 +587,8 @@ VersionedItem LocalVersionedEngine::update_internal( query, frame, get_write_options(), - dynamic_schema); + dynamic_schema, + cfg().write_options().empty_types()); write_version_and_prune_previous( prune_previous_versions, versioned_item.key_, update_info.previous_index_key_); return versioned_item; @@ -1398,7 +1399,8 @@ VersionedItem LocalVersionedEngine::append_internal( update_info, frame, get_write_options(), - validate_index); + validate_index, + cfg().write_options().empty_types()); write_version_and_prune_previous( prune_previous_versions, versioned_item.key_, update_info.previous_index_key_); return versioned_item; @@ -1446,7 +1448,7 @@ std::vector> LocalVersionedEngine::batch_ auto index_key_fut = folly::Future::makeEmpty(); auto write_options = get_write_options(); if (update_info.previous_index_key_.has_value()) { - index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index); + index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index, cfg().write_options().empty_types()); } else { missing_data::check( upsert, diff --git a/cpp/arcticdb/version/schema_checks.hpp b/cpp/arcticdb/version/schema_checks.hpp index 004bd14b3a..ed4f61e5c0 100644 --- a/cpp/arcticdb/version/schema_checks.hpp +++ b/cpp/arcticdb/version/schema_checks.hpp @@ -42,29 +42,49 @@ inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type& return IndexDescriptor::UNKNOWN; } -inline void check_normalization_index_match(NormalizationOperation operation, - const StreamDescriptor &old_descriptor, - const pipelines::InputTensorFrame &frame) { +inline void check_normalization_index_match( + NormalizationOperation operation, + const StreamDescriptor& old_descriptor, + const pipelines::InputTensorFrame& frame, + bool empty_types +) { const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type(); const IndexDescriptor::Type new_idx_kind = frame.desc.index().type(); if (operation == UPDATE) { const bool new_is_timeseries = std::holds_alternative(frame.index); util::check_rte( (old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries, - "Update will not work as expected with a non-timeseries index"); + "Update will not work as expected with a non-timeseries index" + ); } else { const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind); - normalization::check( - common_index_type != IndexDescriptor::UNKNOWN, - "Cannot append {} index to {} index", - index_type_to_str(new_idx_kind), - index_type_to_str(old_idx_kind) - ); + if (empty_types) { + normalization::check( + common_index_type != IndexDescriptor::UNKNOWN, + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } else { + normalization::check( + common_index_type != IndexDescriptor::UNKNOWN || + (old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT), + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); + } } } -inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) { - const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0; +inline bool columns_match( + const StreamDescriptor& df_in_store_descriptor, + const StreamDescriptor& new_df_descriptor, + bool empty_types +) { + const int right_fields_offset = (empty_types && df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY) + ? new_df_descriptor.index().field_count() + : 0; // The empty index is compatible with all other index types. Differences in the index fields in this case is // allowed. The index fields are always the first in the list. if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) { @@ -90,16 +110,17 @@ inline void fix_descriptor_mismatch_or_throw( NormalizationOperation operation, bool dynamic_schema, const pipelines::index::IndexSegmentReader &existing_isr, - const pipelines::InputTensorFrame &new_frame) { + const pipelines::InputTensorFrame &new_frame, + bool empty_types) { const auto &old_sd = existing_isr.tsd().as_stream_descriptor(); - check_normalization_index_match(operation, old_sd, new_frame); + check_normalization_index_match(operation, old_sd, new_frame, empty_types); if (dynamic_schema) return; // TODO: dynamic schema may need some of the checks as below fix_normalization_or_throw(operation == APPEND, existing_isr, new_frame); - if (!columns_match(old_sd, new_frame.desc)) { + if (!columns_match(old_sd, new_frame.desc, empty_types)) { throw StreamDescriptorMismatch( "The columns (names and types) in the argument are not identical to that of the existing version", StreamDescriptor{old_sd}, diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 5f3f3a56ae..1ffc869af1 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -137,7 +137,8 @@ folly::Future async_append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index) { + bool validate_index, + bool empty_types) { util::check(update_info.previous_index_key_.has_value(), "Cannot append as there is no previous index key to append to"); const StreamId stream_id = frame->desc.id(); @@ -147,7 +148,7 @@ folly::Future async_append_impl( auto row_offset = index_segment_reader.tsd().proto().total_rows(); util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data"); frame->set_offset(static_cast(row_offset)); - fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame); + fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame, empty_types); if (validate_index) { sorted_data_check_append(*frame, index_segment_reader); } @@ -162,14 +163,16 @@ VersionedItem append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index) { + bool validate_index, + bool empty_types) { ARCTICDB_SUBSAMPLE_DEFAULT(WaitForWriteCompletion) auto version_key_fut = async_append_impl(store, update_info, frame, options, - validate_index); + validate_index, + empty_types); auto version_key = std::move(version_key_fut).get(); auto versioned_item = VersionedItem(to_atom(std::move(version_key))); ARCTICDB_DEBUG(log::version(), "write_dataframe_impl stream_id: {} , version_id: {}", versioned_item.symbol(), update_info.next_version_id_); @@ -323,7 +326,8 @@ VersionedItem update_impl( const UpdateQuery& query, const std::shared_ptr& frame, const WriteOptions&& options, - bool dynamic_schema) { + bool dynamic_schema, + bool empty_types) { util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into"); const StreamId stream_id = frame->desc.id(); ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id()); @@ -337,7 +341,7 @@ VersionedItem update_impl( sorted_data_check_update(*frame, index_segment_reader); bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic); - fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame); + fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types); std::vector> queries = build_update_query_filters(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic()); diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index c7d488b2a2..9247c61136 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -54,14 +54,16 @@ folly::Future async_append_impl( const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index); + bool validate_index, + bool empty_types); VersionedItem append_impl( const std::shared_ptr& store, const UpdateInfo& update_info, const std::shared_ptr& frame, const WriteOptions& options, - bool validate_index); + bool validate_index, + bool empty_types); VersionedItem update_impl( const std::shared_ptr& store, @@ -69,7 +71,8 @@ VersionedItem update_impl( const UpdateQuery & query, const std::shared_ptr& frame, const WriteOptions&& options, - bool dynamic_schema); + bool dynamic_schema, + bool empty_types); VersionedItem delete_range_impl( const std::shared_ptr& store, diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index a2dfb73f82..ebab1cfe80 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -370,9 +370,9 @@ def _denormalize_single_index(item, norm_meta): name = norm_meta.index.name if norm_meta.index.name else None return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name) else: - return Index([]) + return None else: - return Index([]) + return RangeIndex(start=0, stop=0, step=1) # this means that the index is not a datetime index and it's been represented as a regular field in the stream item.index_columns.append(item.names.pop(0)) @@ -523,9 +523,10 @@ def denormalize(self, item, norm_meta): class _PandasNormalizer(Normalizer): - def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): + def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_types): index = df.index - if len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0: + empty_df = len(index) == 0 + if empty_df and empty_types: index_norm = pd_norm.index index_norm.is_physically_stored = False index = Index([]) @@ -550,12 +551,10 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): df.reset_index(fields, inplace=True) index = df.index else: - is_not_range_index = not isinstance(index, RangeIndex) - df_has_rows = not(len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0) index_norm = pd_norm.index - index_norm.is_physically_stored = is_not_range_index and df_has_rows - if not df_has_rows: - index = Index([]) + index_norm.is_physically_stored = not isinstance(index, RangeIndex) and not empty_df + if empty_df: + index = DatetimeIndex([]) return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len) @@ -587,12 +586,13 @@ class SeriesNormalizer(_PandasNormalizer): def __init__(self): self._df_norm = DataFrameNormalizer() - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): df, norm = self._df_norm.normalize( item.to_frame(), dynamic_strings=dynamic_strings, string_max_len=string_max_len, coerce_columns=coerce_columns, + empty_types=empty_types ) norm.series.CopyFrom(norm.df) if item.name: @@ -843,7 +843,7 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata. return df - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): # type: (DataFrame, Optional[int])->NormalizedInput norm_meta = NormalizationMetadata() norm_meta.df.common.mark = True @@ -859,7 +859,7 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col raise ArcticDbNotYetImplemented("MultiIndex column are not supported yet") index_names, ix_vals = self._index_to_records( - item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len + item, norm_meta.df.common, dynamic_strings, string_max_len=string_max_len, empty_types=empty_types ) # The first branch of this if is faster, but does not work with null/duplicated column names if item.columns.is_unique and not item.columns.hasnans: @@ -1173,7 +1173,7 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization self.fallback_normalizer = fallback_normalizer - def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, **kwargs): + def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): normalizer = self.get_normalizer_for_type(item) if not normalizer: @@ -1185,6 +1185,7 @@ def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_co string_max_len=string_max_len, dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, + empty_types=empty_types, **kwargs, ) @@ -1222,7 +1223,7 @@ def get_normalizer_for_type(self, item): return None def normalize( - self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, **kwargs + self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs ): """ :param item: Item to be normalized to something Arctic Native understands. @@ -1231,13 +1232,13 @@ def normalize( :param pickle_on_failure: This will fallback to pickling the Supported objects (DataFrame, Series, TimeFrame) even if use_norm_failure_handler_known_types was not configured at the library level. """ - try: return self._normalize( item, string_max_len=string_max_len, dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, + empty_types=empty_types, **kwargs, ) except Exception as ex: diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 68abc3571e..f26b987f36 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -332,6 +332,9 @@ def _try_normalize( dynamic_schema = self.resolve_defaults( "dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs ) + empty_types = self.resolve_defaults( + "empty_types", self._lib_cfg.lib_desc.version.write_options, False, **kwargs + ) try: udm = normalize_metadata(metadata) if metadata is not None else None opt_custom = self._custom_normalizer.normalize(dataframe) @@ -347,6 +350,7 @@ def _try_normalize( dynamic_strings=dynamic_strings, coerce_columns=coerce_columns, dynamic_schema=dynamic_schema, + empty_types=empty_types, **kwargs, ) except ArcticDbNotYetImplemented as ex: diff --git a/python/tests/unit/arcticdb/version_store/test_empty_writes.py b/python/tests/unit/arcticdb/version_store/test_empty_writes.py index 67bb439fbe..dd07071321 100644 --- a/python/tests/unit/arcticdb/version_store/test_empty_writes.py +++ b/python/tests/unit/arcticdb/version_store/test_empty_writes.py @@ -146,33 +146,20 @@ def test_empty_series(lmdb_version_store_dynamic_schema, sym): assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=False) -@pytest.mark.parametrize("dtype, existing_empty, append_empty", [ - ("int64", True, False), - ("float64", True, False), - ("float64", False, True), - ("float64", True, True), +@pytest.mark.parametrize("dtype, series, append_series", [ + ("int64", pd.Series([]), pd.Series([1, 2, 3], dtype="int64")), + ("float64", pd.Series([]), pd.Series([1, 2, 3], dtype="float64")), + ("float64", pd.Series([1, 2, 3], dtype="float64"), pd.Series([])), + ("float64", pd.Series([]), pd.Series([])), ]) -def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, existing_empty, append_empty): - empty_ser = pd.Series([]) - non_empty_ser = pd.Series([1, 2, 3], dtype=dtype) - if existing_empty: - ser = empty_ser - else: - ser = non_empty_ser - lmdb_version_store_dynamic_schema.write(sym, ser) +def test_append_empty_series(lmdb_version_store_dynamic_schema, sym, dtype, series, append_series): + lmdb_version_store_dynamic_schema.write(sym, series) assert not lmdb_version_store_dynamic_schema.is_symbol_pickled(sym) - # ArcticDB stores empty columns under a dedicated `EMPTYVAL` type, so the types are not going to match with pandas # if the series is empty. - assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, ser, check_index_type=(len(ser) > 0)) - - if append_empty: - new_ser = empty_ser - else: - new_ser = non_empty_ser - lmdb_version_store_dynamic_schema.append(sym, new_ser) - - result_ser = pd.concat([ser, new_ser]) + assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, series, check_index_type=(len(series) > 0)) + lmdb_version_store_dynamic_schema.append(sym, append_series) + result_ser = pd.concat([series, append_series]) assert_series_equal(lmdb_version_store_dynamic_schema.read(sym).data, result_ser, check_index_type=(len(result_ser) > 0))