Skip to content

Commit

Permalink
Feature flag empty_index (#1475)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
Relates to: #1429, #1440

#### What does this implement or fix?
Feature flag the use of empty type. It uses the same library config
option as the empty column type. In order to preserve behavior empty
dataframes will use DataTimeIndex.
#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <[email protected]>
  • Loading branch information
vasil-pashov and Vasil Pashov committed Apr 24, 2024
1 parent e9fa562 commit 2c1cd87
Show file tree
Hide file tree
Showing 9 changed files with 294 additions and 90 deletions.
17 changes: 8 additions & 9 deletions cpp/arcticdb/python/python_to_tensor_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
const auto arr = pybind11::detail::array_proxy(ptr);
const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr);
auto ndim = arr->nd;
ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1];
const ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1];
// In Pandas < 2, empty series dtype is `"float"`, but as of Pandas 2.0, empty series dtype is `"object"`
// The Normalizer in Python cast empty `"float"` series to `"object"` so `EMPTY` is used here.
// See: https://github.com/man-group/ArcticDB/pull/1049
Expand Down Expand Up @@ -264,17 +264,16 @@ std::shared_ptr<InputTensorFrame> py_ndf_to_frame(

// idx_names are passed by the python layer. They are empty in case row count index is used see:
// https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291
// Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify
// Currently the python layers assign RowRange index to both empty dataframes and dataframes which do not specify
// index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there.
if (idx_names.empty()) {
if (res->num_rows > 0) {
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
} else {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
}

if (empty_types && res->num_rows == 0) {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}

ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc);
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ VersionedItem LocalVersionedEngine::update_internal(
query,
frame,
get_write_options(),
dynamic_schema);
dynamic_schema,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1398,7 +1399,8 @@ VersionedItem LocalVersionedEngine::append_internal(
update_info,
frame,
get_write_options(),
validate_index);
validate_index,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1446,7 +1448,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
auto index_key_fut = folly::Future<AtomKey>::makeEmpty();
auto write_options = get_write_options();
if (update_info.previous_index_key_.has_value()) {
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index);
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index, cfg().write_options().empty_types());
} else {
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert,
Expand Down
61 changes: 44 additions & 17 deletions cpp/arcticdb/version/schema_checks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,42 +42,68 @@ inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type&
return IndexDescriptor::UNKNOWN;
}

inline void check_normalization_index_match(NormalizationOperation operation,
const StreamDescriptor &old_descriptor,
const pipelines::InputTensorFrame &frame) {
inline void check_normalization_index_match(
NormalizationOperation operation,
const StreamDescriptor& old_descriptor,
const pipelines::InputTensorFrame& frame,
bool empty_types
) {
const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type();
const IndexDescriptor::Type new_idx_kind = frame.desc.index().type();
if (operation == UPDATE) {
const bool new_is_timeseries = std::holds_alternative<TimeseriesIndex>(frame.index);
util::check_rte(
(old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries,
"Update will not work as expected with a non-timeseries index");
"Update will not work as expected with a non-timeseries index"
);
} else {
const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind);
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
if (empty_types) {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
} else {
// (old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT) is left to preserve
// pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series
// have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545
// it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index
// in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series.
// Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would
// have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better
// after we enable the empty index.
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN ||
(old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT),
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
}
}
}

inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) {
const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0;
inline bool columns_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
) {
const int index_field_size =
df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0;
// The empty index is compatible with all other index types. Differences in the index fields in this case is
// allowed. The index fields are always the first in the list.
if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) {
if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) {
return false;
}
// In case the left index is empty index we want to skip name/type checking of the index fields which are always
// the first fields.
for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) {
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + right_fields_offset).name())
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name())
return false;

const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + right_fields_offset).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type();

if (!trivially_compatible_types(left_type, right_type) &&
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type())))
Expand All @@ -90,9 +116,10 @@ inline void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader &existing_isr,
const pipelines::InputTensorFrame &new_frame) {
const pipelines::InputTensorFrame &new_frame,
bool empty_types) {
const auto &old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame);
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

if (dynamic_schema)
return; // TODO: dynamic schema may need some of the checks as below
Expand Down
16 changes: 10 additions & 6 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

util::check(update_info.previous_index_key_.has_value(), "Cannot append as there is no previous index key to append to");
const StreamId stream_id = frame->desc.id();
Expand All @@ -147,7 +148,7 @@ folly::Future<AtomKey> async_append_impl(
auto row_offset = index_segment_reader.tsd().proto().total_rows();
util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data");
frame->set_offset(static_cast<ssize_t>(row_offset));
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame, empty_types);
if (validate_index) {
sorted_data_check_append(*frame, index_segment_reader);
}
Expand All @@ -162,14 +163,16 @@ VersionedItem append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

ARCTICDB_SUBSAMPLE_DEFAULT(WaitForWriteCompletion)
auto version_key_fut = async_append_impl(store,
update_info,
frame,
options,
validate_index);
validate_index,
empty_types);
auto version_key = std::move(version_key_fut).get();
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "write_dataframe_impl stream_id: {} , version_id: {}", versioned_item.symbol(), update_info.next_version_id_);
Expand Down Expand Up @@ -323,7 +326,8 @@ VersionedItem update_impl(
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema) {
bool dynamic_schema,
bool empty_types) {
util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into");
const StreamId stream_id = frame->desc.id();
ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id());
Expand All @@ -337,7 +341,7 @@ VersionedItem update_impl(
sorted_data_check_update(*frame, index_segment_reader);
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
(void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types);

std::vector<FilterQuery<index::IndexSegmentReader>> queries =
build_update_query_filters<index::IndexSegmentReader>(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic());
Expand Down
9 changes: 6 additions & 3 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,25 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem append_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem update_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const UpdateQuery & query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema);
bool dynamic_schema,
bool empty_types);

VersionedItem delete_range_impl(
const std::shared_ptr<Store>& store,
Expand Down
Loading

0 comments on commit 2c1cd87

Please sign in to comment.