Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature flag empty_index #1475

Merged
merged 9 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions cpp/arcticdb/python/python_to_tensor_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
const auto arr = pybind11::detail::array_proxy(ptr);
const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr);
auto ndim = arr->nd;
ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1];
const ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1];
// In Pandas < 2, empty series dtype is `"float"`, but as of Pandas 2.0, empty series dtype is `"object"`
// The Normalizer in Python cast empty `"float"` series to `"object"` so `EMPTY` is used here.
// See: https://github.com/man-group/ArcticDB/pull/1049
Expand Down Expand Up @@ -264,17 +264,16 @@ std::shared_ptr<InputTensorFrame> py_ndf_to_frame(

// idx_names are passed by the python layer. They are empty in case row count index is used see:
// https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291
// Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify
// Currently the python layers assign RowRange index to both empty dataframes and dataframes which do not specify
// index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there.
if (idx_names.empty()) {
if (res->num_rows > 0) {
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
} else {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}
res->index = stream::RowCountIndex();
res->desc.set_index_type(IndexDescriptor::ROWCOUNT);
}

if (empty_types && res->num_rows == 0) {
res->index = stream::EmptyIndex();
res->desc.set_index_type(IndexDescriptor::EMPTY);
}

ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc);
Expand Down
8 changes: 5 additions & 3 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,8 @@ VersionedItem LocalVersionedEngine::update_internal(
query,
frame,
get_write_options(),
dynamic_schema);
dynamic_schema,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1398,7 +1399,8 @@ VersionedItem LocalVersionedEngine::append_internal(
update_info,
frame,
get_write_options(),
validate_index);
validate_index,
cfg().write_options().empty_types());
write_version_and_prune_previous(
prune_previous_versions, versioned_item.key_, update_info.previous_index_key_);
return versioned_item;
Expand Down Expand Up @@ -1446,7 +1448,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
auto index_key_fut = folly::Future<AtomKey>::makeEmpty();
auto write_options = get_write_options();
if (update_info.previous_index_key_.has_value()) {
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index);
index_key_fut = async_append_impl(store(), update_info, frame, write_options, validate_index, cfg().write_options().empty_types());
} else {
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert,
Expand Down
61 changes: 44 additions & 17 deletions cpp/arcticdb/version/schema_checks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,42 +42,68 @@ inline IndexDescriptor::Type get_common_index_type(const IndexDescriptor::Type&
return IndexDescriptor::UNKNOWN;
}

inline void check_normalization_index_match(NormalizationOperation operation,
const StreamDescriptor &old_descriptor,
const pipelines::InputTensorFrame &frame) {
inline void check_normalization_index_match(
NormalizationOperation operation,
const StreamDescriptor& old_descriptor,
const pipelines::InputTensorFrame& frame,
bool empty_types
) {
const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type();
const IndexDescriptor::Type new_idx_kind = frame.desc.index().type();
if (operation == UPDATE) {
const bool new_is_timeseries = std::holds_alternative<TimeseriesIndex>(frame.index);
util::check_rte(
(old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries,
"Update will not work as expected with a non-timeseries index");
"Update will not work as expected with a non-timeseries index"
);
} else {
const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind);
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
if (empty_types) {
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN,
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
} else {
// (old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT) is left to preserve
// pre-empty index behavior with pandas 2, see test_empty_writes.py::test_append_empty_series. Empty pd.Series
// have Rowrange index, but due to: https://github.com/man-group/ArcticDB/blob/bd1776291fe402d8b18af9fea865324ebd7705f1/python/arcticdb/version_store/_normalization.py#L545
// it gets converted to DatetimeIndex (all empty indexes except categorical and multiindex are converted to datetime index
// in pandas 2 if empty index type is disabled), however we still want to be able to append pd.Series to empty pd.Series.
// Having this will not allow appending RowCont indexed pd.DataFrames to DateTime indexed pd.DataFrames because they would
// have different field size (the rowcount index is not stored as a field). This logic is bug prone and will become better
// after we enable the empty index.
normalization::check<ErrorCode::E_INCOMPATIBLE_INDEX>(
common_index_type != IndexDescriptor::UNKNOWN ||
(old_idx_kind == IndexDescriptor::TIMESTAMP && new_idx_kind == IndexDescriptor::ROWCOUNT),
"Cannot append {} index to {} index",
index_type_to_str(new_idx_kind),
index_type_to_str(old_idx_kind)
);
}
}
}

inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) {
const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0;
inline bool columns_match(
const StreamDescriptor& df_in_store_descriptor,
const StreamDescriptor& new_df_descriptor
) {
const int index_field_size =
df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0;
// The empty index is compatible with all other index types. Differences in the index fields in this case is
// allowed. The index fields are always the first in the list.
if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) {
if (df_in_store_descriptor.fields().size() + index_field_size != new_df_descriptor.fields().size()) {
return false;
}
// In case the left index is empty index we want to skip name/type checking of the index fields which are always
// the first fields.
for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) {
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + right_fields_offset).name())
if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + index_field_size).name())
return false;

const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + right_fields_offset).type();
const TypeDescriptor& right_type = new_df_descriptor.fields(i + index_field_size).type();

if (!trivially_compatible_types(left_type, right_type) &&
!(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type())))
Expand All @@ -90,9 +116,10 @@ inline void fix_descriptor_mismatch_or_throw(
NormalizationOperation operation,
bool dynamic_schema,
const pipelines::index::IndexSegmentReader &existing_isr,
const pipelines::InputTensorFrame &new_frame) {
const pipelines::InputTensorFrame &new_frame,
bool empty_types) {
const auto &old_sd = existing_isr.tsd().as_stream_descriptor();
check_normalization_index_match(operation, old_sd, new_frame);
check_normalization_index_match(operation, old_sd, new_frame, empty_types);

if (dynamic_schema)
return; // TODO: dynamic schema may need some of the checks as below
Expand Down
16 changes: 10 additions & 6 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

util::check(update_info.previous_index_key_.has_value(), "Cannot append as there is no previous index key to append to");
const StreamId stream_id = frame->desc.id();
Expand All @@ -147,7 +148,7 @@ folly::Future<AtomKey> async_append_impl(
auto row_offset = index_segment_reader.tsd().proto().total_rows();
util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data");
frame->set_offset(static_cast<ssize_t>(row_offset));
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame, empty_types);
if (validate_index) {
sorted_data_check_append(*frame, index_segment_reader);
}
Expand All @@ -162,14 +163,16 @@ VersionedItem append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index) {
bool validate_index,
bool empty_types) {

ARCTICDB_SUBSAMPLE_DEFAULT(WaitForWriteCompletion)
auto version_key_fut = async_append_impl(store,
update_info,
frame,
options,
validate_index);
validate_index,
empty_types);
auto version_key = std::move(version_key_fut).get();
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "write_dataframe_impl stream_id: {} , version_id: {}", versioned_item.symbol(), update_info.next_version_id_);
Expand Down Expand Up @@ -323,7 +326,8 @@ VersionedItem update_impl(
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema) {
bool dynamic_schema,
bool empty_types) {
util::check(update_info.previous_index_key_.has_value(), "Cannot update as there is no previous index key to update into");
const StreamId stream_id = frame->desc.id();
ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", stream_id, update_info.previous_index_key_->version_id());
Expand All @@ -337,7 +341,7 @@ VersionedItem update_impl(
sorted_data_check_update(*frame, index_segment_reader);
bool bucketize_dynamic = index_segment_reader.bucketize_dynamic();
(void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame);
fix_descriptor_mismatch_or_throw(UPDATE, dynamic_schema, index_segment_reader, *frame, empty_types);

std::vector<FilterQuery<index::IndexSegmentReader>> queries =
build_update_query_filters<index::IndexSegmentReader>(query.row_filter, frame->index, frame->index_range, dynamic_schema, index_segment_reader.bucketize_dynamic());
Expand Down
9 changes: 6 additions & 3 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,25 @@ folly::Future<AtomKey> async_append_impl(
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem append_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
bool validate_index);
bool validate_index,
bool empty_types);

VersionedItem update_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const UpdateQuery & query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions&& options,
bool dynamic_schema);
bool dynamic_schema,
bool empty_types);

VersionedItem delete_range_impl(
const std::shared_ptr<Store>& store,
Expand Down
Loading
Loading