diff --git a/cpp/arcticdb/entity/merge_descriptors.cpp b/cpp/arcticdb/entity/merge_descriptors.cpp index ed8ea636cf..a41a8cfe18 100644 --- a/cpp/arcticdb/entity/merge_descriptors.cpp +++ b/cpp/arcticdb/entity/merge_descriptors.cpp @@ -46,8 +46,13 @@ StreamDescriptor merge_descriptors( // Merge all the fields for all slices, apart from the index which we already have from the first descriptor. // Note that we preserve the ordering as we see columns, especially the index which needs to be column 0. for (const auto &fields : entries) { - if(has_index) - util::variant_match(index, [&fields] (const auto& idx) { idx.check(*fields); }); + if (has_index) { + util::variant_match(index, + [](const EmptyIndex&) {}, + [](const RowCountIndex&) {}, + [&fields] (const auto& idx) { idx.check(*fields); } + ); + } for (size_t idx = has_index ? 1u : 0u; idx < static_cast(fields->size()); ++idx) { const auto& field = fields->at(idx); diff --git a/cpp/arcticdb/entity/stream_descriptor.hpp b/cpp/arcticdb/entity/stream_descriptor.hpp index 25a9a1a71c..d6812dc203 100644 --- a/cpp/arcticdb/entity/stream_descriptor.hpp +++ b/cpp/arcticdb/entity/stream_descriptor.hpp @@ -22,7 +22,6 @@ struct StreamDescriptor { std::shared_ptr data_ = std::make_shared(); std::shared_ptr fields_ = std::make_shared(); - ; StreamDescriptor() = default; ~StreamDescriptor() = default; @@ -65,7 +64,7 @@ struct StreamDescriptor { data_->set_sorted(sorted_value_to_proto(sorted)); } - SortedValue get_sorted() { + SortedValue get_sorted() const { return sorted_value_from_proto(data_->sorted()); } diff --git a/cpp/arcticdb/entity/types_proto.cpp b/cpp/arcticdb/entity/types_proto.cpp index 5fd69d7e8a..5d53a3092f 100644 --- a/cpp/arcticdb/entity/types_proto.cpp +++ b/cpp/arcticdb/entity/types_proto.cpp @@ -123,4 +123,69 @@ namespace arcticdb::entity { }, id); } + IndexDescriptor::IndexDescriptor(size_t field_count, Type type) { + data_.set_kind(type); + data_.set_field_count(static_cast(field_count)); + } + + IndexDescriptor::IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data) + : data_(std::move(data)) { + } + + bool IndexDescriptor::uninitialized() const { + return data_.field_count() == 0 && data_.kind() == Type::IndexDescriptor_Type_UNKNOWN; + } + + const IndexDescriptor::Proto& IndexDescriptor::proto() const { + return data_; + } + + size_t IndexDescriptor::field_count() const { + return static_cast(data_.field_count()); + } + + IndexDescriptor::Type IndexDescriptor::type() const { + return data_.kind(); + } + + void IndexDescriptor::set_type(Type type) { + data_.set_kind(type); + } + + bool operator==(const IndexDescriptor& left, const IndexDescriptor& right) { + return left.type() == right.type(); + } + + IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type) { + switch (type) { + case IndexDescriptor::EMPTY: return 'E'; + case IndexDescriptor::TIMESTAMP: return 'T'; + case IndexDescriptor::ROWCOUNT: return 'R'; + case IndexDescriptor::STRING: return 'S'; + case IndexDescriptor::UNKNOWN: return 'U'; + default: util::raise_rte("Unknown index type: {}", int(type)); + } + } + + IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type) { + switch (type) { + case 'E': return IndexDescriptor::EMPTY; + case 'T': return IndexDescriptor::TIMESTAMP; + case 'R': return IndexDescriptor::ROWCOUNT; + case 'S': return IndexDescriptor::STRING; + case 'U': return IndexDescriptor::UNKNOWN; + default: util::raise_rte("Unknown index type: {}", int(type)); + } + } + + const char* index_type_to_str(IndexDescriptor::Type type) { + switch (type) { + case IndexDescriptor::EMPTY: return "Empty"; + case IndexDescriptor::TIMESTAMP: return "Timestamp"; + case IndexDescriptor::ROWCOUNT: return "Row count"; + case IndexDescriptor::STRING: return "String"; + case IndexDescriptor::UNKNOWN: return "Unknown"; + default: util::raise_rte("Unknown index type: {}", int(type)); + } + } } // namespace arcticdb diff --git a/cpp/arcticdb/entity/types_proto.hpp b/cpp/arcticdb/entity/types_proto.hpp index ed54518239..be95972fa0 100644 --- a/cpp/arcticdb/entity/types_proto.hpp +++ b/cpp/arcticdb/entity/types_proto.hpp @@ -49,69 +49,29 @@ namespace arcticdb::entity { Proto data_; using Type = arcticdb::proto::descriptors::IndexDescriptor::Type; - static const Type UNKNOWN = arcticdb::proto::descriptors::IndexDescriptor_Type_UNKNOWN; - static const Type ROWCOUNT = arcticdb::proto::descriptors::IndexDescriptor_Type_ROWCOUNT; - static const Type STRING = arcticdb::proto::descriptors::IndexDescriptor_Type_STRING; - static const Type TIMESTAMP = arcticdb::proto::descriptors::IndexDescriptor_Type_TIMESTAMP; + static constexpr Type UNKNOWN = arcticdb::proto::descriptors::IndexDescriptor_Type_UNKNOWN; + static constexpr Type EMPTY = arcticdb::proto::descriptors::IndexDescriptor_Type_EMPTY; + static constexpr Type ROWCOUNT = arcticdb::proto::descriptors::IndexDescriptor_Type_ROWCOUNT; + static constexpr Type STRING = arcticdb::proto::descriptors::IndexDescriptor_Type_STRING; + static constexpr Type TIMESTAMP = arcticdb::proto::descriptors::IndexDescriptor_Type_TIMESTAMP; using TypeChar = char; IndexDescriptor() = default; - IndexDescriptor(size_t field_count, Type type) { - data_.set_kind(type); - data_.set_field_count(static_cast(field_count)); - } - - explicit IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data) - : data_(std::move(data)) { - } - - bool uninitialized() const { - return data_.field_count() == 0 && data_.kind() == Type::IndexDescriptor_Type_UNKNOWN; - } - - const Proto& proto() const { - return data_; - } - - size_t field_count() const { - return static_cast(data_.field_count()); - } - - Type type() const { - return data_.kind(); - } - - void set_type(Type type) { - data_.set_kind(type); - } - ARCTICDB_MOVE_COPY_DEFAULT(IndexDescriptor) - - friend bool operator==(const IndexDescriptor& left, const IndexDescriptor& right) { - return left.type() == right.type(); - } + IndexDescriptor(size_t field_count, Type type); + explicit IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data); + bool uninitialized() const; + const Proto& proto() const; + size_t field_count() const; + Type type() const; + void set_type(Type type); + friend bool operator==(const IndexDescriptor& left, const IndexDescriptor& right); }; - constexpr IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type) { - switch (type) { - case IndexDescriptor::TIMESTAMP:return 'T'; - case IndexDescriptor::ROWCOUNT:return 'R'; - case IndexDescriptor::STRING:return 'S'; - case IndexDescriptor::UNKNOWN:return 'U'; - default:util::raise_rte("Unknown index type: {}", int(type)); - } - } - - constexpr IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type) { - switch (type) { - case 'T': return IndexDescriptor::TIMESTAMP; - case 'R': return IndexDescriptor::ROWCOUNT; - case 'S': return IndexDescriptor::STRING; - case 'U': return IndexDescriptor::UNKNOWN; - default:util::raise_rte("Unknown index type: {}", int(type)); - } - } + IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type); + IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type); + const char* index_type_to_str(IndexDescriptor::Type type); void set_id(arcticdb::proto::descriptors::StreamDescriptor& pb_desc, StreamId id); diff --git a/cpp/arcticdb/pipeline/frame_utils.cpp b/cpp/arcticdb/pipeline/frame_utils.cpp index abd47f87bd..b96fd3f515 100644 --- a/cpp/arcticdb/pipeline/frame_utils.cpp +++ b/cpp/arcticdb/pipeline/frame_utils.cpp @@ -148,8 +148,8 @@ std::pair offset_and_row_count(const std::shared_ptr& frame) { - return !std::holds_alternative(frame->index) || frame->desc.get_sorted() == SortedValue::ASCENDING; +bool index_is_not_timeseries_or_is_sorted_ascending(const pipelines::InputTensorFrame& frame) { + return !std::holds_alternative(frame.index) || frame.desc.get_sorted() == SortedValue::ASCENDING; } } diff --git a/cpp/arcticdb/pipeline/frame_utils.hpp b/cpp/arcticdb/pipeline/frame_utils.hpp index 58e8f8c0ef..c303c67c3d 100644 --- a/cpp/arcticdb/pipeline/frame_utils.hpp +++ b/cpp/arcticdb/pipeline/frame_utils.hpp @@ -311,6 +311,6 @@ size_t get_slice_rowcounts( std::pair offset_and_row_count( const std::shared_ptr& context); -bool index_is_not_timeseries_or_is_sorted_ascending(const std::shared_ptr& frame); +bool index_is_not_timeseries_or_is_sorted_ascending(const pipelines::InputTensorFrame& frame); } //namespace arcticdb diff --git a/cpp/arcticdb/pipeline/index_writer.hpp b/cpp/arcticdb/pipeline/index_writer.hpp index 4ee9104ae3..00d51b465e 100644 --- a/cpp/arcticdb/pipeline/index_writer.hpp +++ b/cpp/arcticdb/pipeline/index_writer.hpp @@ -17,7 +17,7 @@ namespace arcticdb::pipelines::index { // TODO: change the name - something like KeysSegmentWriter or KeyAggragator or better -template, bool> = 0> +template class IndexWriter { // All index segments are row-count indexed in the sense that the keys are // already ordered - they don't need an additional index diff --git a/cpp/arcticdb/pipeline/input_tensor_frame.hpp b/cpp/arcticdb/pipeline/input_tensor_frame.hpp index 55364863b1..69eef7a2b8 100644 --- a/cpp/arcticdb/pipeline/input_tensor_frame.hpp +++ b/cpp/arcticdb/pipeline/input_tensor_frame.hpp @@ -18,14 +18,20 @@ namespace arcticdb::pipelines { using namespace arcticdb::entity; -struct InputTensorFrame { +/// @TODO Move to a separate "util" header +template +concept is_any_of = (std::same_as || ...); + +template +concept ValidIndex = is_any_of< + std::remove_cvref_t>>, + stream::TimeseriesIndex, + stream::RowCountIndex, + stream::TableIndex, + stream::EmptyIndex>; - template - static constexpr bool is_valid_index_v = - std::is_same_v || - std::is_same_v || - std::is_same_v; +struct InputTensorFrame { InputTensorFrame() : index(stream::empty_index()) {} diff --git a/cpp/arcticdb/python/normalization_checks.cpp b/cpp/arcticdb/python/normalization_checks.cpp index d34b806fbd..109c3b4acc 100644 --- a/cpp/arcticdb/python/normalization_checks.cpp +++ b/cpp/arcticdb/python/normalization_checks.cpp @@ -16,17 +16,21 @@ namespace arcticdb { -template -auto get_pandas_common_via_reflection(NormalizationMetadata norm_meta, InnerFunction &&inner_function) --> decltype(inner_function(norm_meta, std::declval(), std::declval())) { +template + auto get_pandas_common_via_reflection( + proto::descriptors::NormalizationMetadata norm_meta, + InnerFunction&& inner_function + ) -> decltype(inner_function(norm_meta, std::declval(), std::declval())) { try { - if (norm_meta.input_type_case() != NormalizationMetadata::INPUT_TYPE_NOT_SET) { - if (auto one_of = NormalizationMetadata::descriptor()->field(norm_meta.input_type_case()); one_of) { + if (norm_meta.input_type_case() != proto::descriptors::NormalizationMetadata::INPUT_TYPE_NOT_SET) { + if (auto one_of = proto::descriptors::NormalizationMetadata::descriptor()->field(norm_meta.input_type_case()); one_of) { log::storage().info("Inefficient NormalizationMetadata.input_type.{} access via reflection", one_of->name()); if (auto msg_type = one_of->message_type(); msg_type) { if (auto common_field = msg_type->FindFieldByName("common"); common_field) { - normalization::check(common_field->message_type() == NormalizationMetadata::Pandas::descriptor(), + normalization::check( + common_field->message_type() == + proto::descriptors::NormalizationMetadata::Pandas::descriptor(), "{}.common must be Pandas", one_of->name()); return inner_function(norm_meta, one_of, common_field); } @@ -40,81 +44,105 @@ auto get_pandas_common_via_reflection(NormalizationMetadata norm_meta, InnerFunc return std::nullopt; } -template std::optional>> -get_common_pandas(const NormalizationMetadata &norm_meta) { +get_common_pandas(const proto::descriptors::NormalizationMetadata& norm_meta) { using Pandas = const arcticdb::proto::descriptors::NormalizationMetadata_Pandas; switch (norm_meta.input_type_case()) { - case NormalizationMetadata::kDf:return std::make_optional(std::reference_wrapper(norm_meta.df().common())); - case NormalizationMetadata::kSeries: - return std::make_optional( - std::reference_wrapper(norm_meta.series().common())); - case NormalizationMetadata::kTs:return std::make_optional(std::reference_wrapper(norm_meta.ts().common())); - - case NormalizationMetadata::kMsgPackFrame: - case NormalizationMetadata::kNp:return std::nullopt; - + case proto::descriptors::NormalizationMetadata::kDf: + return std::make_optional(std::reference_wrapper(norm_meta.df().common())); + case proto::descriptors::NormalizationMetadata::kSeries: + return std::make_optional(std::reference_wrapper(norm_meta.series().common())); + case proto::descriptors::NormalizationMetadata::kTs: + return std::make_optional(std::reference_wrapper(norm_meta.ts().common())); + case proto::descriptors::NormalizationMetadata::kMsgPackFrame: + case proto::descriptors::NormalizationMetadata::kNp: return std::nullopt; default: - return get_pandas_common_via_reflection(norm_meta, [](auto &norm_meta, auto one_of, auto common_field) { - auto &one_of_msg = norm_meta.GetReflection()->GetMessage(norm_meta, one_of); - auto &common_msg = one_of_msg.GetReflection()->GetMessage(one_of_msg, common_field); + return get_pandas_common_via_reflection(norm_meta, [](auto& norm_meta, auto one_of, auto common_field) { + auto& one_of_msg = norm_meta.GetReflection()->GetMessage(norm_meta, one_of); + auto& common_msg = one_of_msg.GetReflection()->GetMessage(one_of_msg, common_field); return std::make_optional(std::reference_wrapper( - *reinterpret_cast(const_cast<::google::protobuf::Message *>(&common_msg)))); + *reinterpret_cast(const_cast<::google::protobuf::Message*>(&common_msg)) + )); }); } } -template std::optional>> -get_common_pandas(NormalizationMetadata - &norm_meta) { +get_common_pandas(proto::descriptors::NormalizationMetadata& norm_meta) { using Pandas = arcticdb::proto::descriptors::NormalizationMetadata_Pandas; - switch (norm_meta. - input_type_case() - ) { - case NormalizationMetadata::kDf: - return - std::make_optional(std::reference_wrapper(*norm_meta.mutable_df()->mutable_common()) - ); - case NormalizationMetadata::kSeries: - return - std::make_optional(std::reference_wrapper(*norm_meta.mutable_series()->mutable_common()) - ); - case NormalizationMetadata::kTs: - return - std::make_optional(std::reference_wrapper(*norm_meta.mutable_ts()->mutable_common()) + switch (norm_meta.input_type_case()) { + case proto::descriptors::NormalizationMetadata::kDf: + return std::make_optional(std::reference_wrapper(*norm_meta.mutable_df()->mutable_common())); + case proto::descriptors::NormalizationMetadata::kSeries: + return std::make_optional(std::reference_wrapper(*norm_meta.mutable_series()->mutable_common())); + case proto::descriptors::NormalizationMetadata::kTs: + return std::make_optional(std::reference_wrapper(*norm_meta.mutable_ts()->mutable_common())); + case proto::descriptors::NormalizationMetadata::kMsgPackFrame: + case proto::descriptors::NormalizationMetadata::kNp: return std::nullopt; + default: + return get_pandas_common_via_reflection(norm_meta, [](auto& norm_meta, auto one_of, auto common_field) { + auto& one_of_msg = norm_meta.GetReflection()->GetMessage(norm_meta, one_of); + auto& common_msg = one_of_msg.GetReflection()->GetMessage(one_of_msg, common_field); + return std::make_optional(std::reference_wrapper( + *reinterpret_cast(const_cast<::google::protobuf::Message*>(&common_msg)) + )); + }); + } +} + +/// In case both indexes are row-ranged sanity checks will be performed: +/// * Both indexes must have the same step +/// * The new index must start at the point where the old one ends +/// If the checks above pass update the new normalization index so that it spans the whole index (old + new) +/// @throws In case the row-ranged indexes are incompatible +void update_rowcount_normalization_data( + const proto::descriptors::NormalizationMetadata& old_norm, + proto::descriptors::NormalizationMetadata& new_norm, + size_t old_length +) { + const auto old_pandas = get_common_pandas(old_norm); + const auto new_pandas = get_common_pandas(new_norm); + const auto* old_index = old_pandas->get().has_index() ? &old_pandas->get().index() : nullptr; + const auto* new_index = new_pandas->get().has_index() ? &new_pandas->get().index() : nullptr; + if (old_index) { + constexpr auto error_suffix = + " the existing version. Please convert both to use Int64Index if you need this to work."; + normalization::check( + old_index->is_physically_stored() == new_index->is_physically_stored(), + "The argument uses a {} index which is incompatible with {}", + new_index->is_physically_stored() ? "non-range" : "range-style", + error_suffix + ); + + if (!old_index->is_physically_stored()) { + normalization::check( + old_index->step() == new_index->step(), + "The new argument has a different RangeIndex step from {}", + error_suffix ); - case NormalizationMetadata::kMsgPackFrame: - case NormalizationMetadata::kNp: - return - std::nullopt; + size_t new_start = new_index->start(); + if (new_start != 0) { + auto stop = old_index->start() + old_length * old_index->step(); + normalization::check( + new_start == stop, + "The appending data has a RangeIndex.start={} that is not contiguous with the {}" + "stop ({}) of", + error_suffix, + new_start, + stop + ); + } - default: - return - get_pandas_common_via_reflection(norm_meta, - []( - auto &norm_meta, - auto one_of, - auto common_field - ) { - auto &one_of_msg = - norm_meta.GetReflection()->GetMessage(norm_meta, one_of); - auto &common_msg = - one_of_msg.GetReflection()->GetMessage(one_of_msg, common_field); - return - std::make_optional - (std::reference_wrapper(*reinterpret_cast(const_cast<::google::protobuf::Message *>(&common_msg))) - ); - } - ); + new_pandas->get().mutable_index()->set_start(old_index->start()); + } } } -template -bool check_pandas_like(const NormalizationMetadata &old_norm, - NormalizationMetadata &new_norm, - size_t old_length) { +bool check_pandas_like( + const proto::descriptors::NormalizationMetadata& old_norm, + proto::descriptors::NormalizationMetadata& new_norm +) { auto old_pandas = get_common_pandas(old_norm); auto new_pandas = get_common_pandas(new_norm); if (old_pandas || new_pandas) { @@ -129,33 +157,6 @@ bool check_pandas_like(const NormalizationMetadata &old_norm, "The argument has an index type incompatible with the existing version:\nexisting={}\nargument={}", util::newlines_to_spaces(old_norm), util::newlines_to_spaces(new_norm)); - - if (old_index) { - constexpr auto - error_suffix = " the existing version. Please convert both to use Int64Index if you need this to work."; - normalization::check(old_index->is_not_range_index() == new_index->is_not_range_index(), - "The argument uses a {} index which is incompatible with {}", - new_index->is_not_range_index() ? "non-range" : "range-style", error_suffix); - - if (!old_index->is_not_range_index()) { - normalization::check(old_index->step() == new_index->step(), - "The new argument has a different RangeIndex step from {}", error_suffix); - - size_t new_start = new_index->start(); - if (new_start != 0) { - auto stop = old_index->start() + old_length * old_index->step(); - normalization::check(new_start == stop, - "The appending data has a RangeIndex.start={} that is not contiguous with the {}" - "stop ({}) of", - error_suffix, - new_start, - stop); - } - - new_pandas->get().mutable_index()->set_start(old_index->start()); - } - } - // FUTURE: check PandasMultiIndex and many other descriptor types. Might be more efficiently implemented using // some structural comparison lib or do it via Python return true; @@ -194,9 +195,14 @@ void fix_normalization_or_throw( const pipelines::InputTensorFrame &new_frame) { auto &old_norm = existing_isr.tsd().proto().normalization(); auto &new_norm = new_frame.norm_meta; - - if (check_pandas_like(old_norm, new_norm, existing_isr.tsd().proto().total_rows())) + if (check_pandas_like(old_norm, new_norm)) { + const IndexDescriptor::Type old_index_type = existing_isr.tsd().proto().stream_descriptor().index().kind(); + const IndexDescriptor::Type new_index_type = new_frame.desc.index().type(); + if (old_index_type == new_index_type && old_index_type == IndexDescriptor::ROWCOUNT) { + update_rowcount_normalization_data(old_norm, new_norm, existing_isr.tsd().proto().total_rows()); + } return; + } if (is_append) { if (check_ndarray_append(old_norm, new_norm)) return; diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp index b7952bcf59..f1a54fd8d5 100644 --- a/cpp/arcticdb/python/python_to_tensor_frame.cpp +++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp @@ -16,15 +16,15 @@ #include namespace arcticdb::convert { -const char none_char[8] = {'\300', '\000', '\000', '\000', '\000', '\000', '\000', '\000'}; +constexpr const char none_char[8] = {'\300', '\000', '\000', '\000', '\000', '\000', '\000', '\000'}; using namespace arcticdb::pipelines; -bool is_unicode(PyObject *obj) { +[[nodiscard]] static inline bool is_unicode(PyObject *obj) { return PyUnicode_Check(obj); } -bool is_py_boolean(PyObject* obj) { +[[nodiscard]] static inline bool is_py_boolean(PyObject* obj) { return PyBool_Check(obj); } @@ -47,7 +47,7 @@ std::variant pystring_to_buffer(PyObject * return api.PyArray_Check_(obj); } -std::tuple determine_python_object_type(PyObject* obj) { +[[nodiscard]] static std::tuple determine_python_object_type(PyObject* obj) { if (is_py_boolean(obj)) { normalization::raise("Nullable booleans are not supported at the moment"); return {ValueType::BOOL_OBJECT, 1, 1}; @@ -62,7 +62,7 @@ std::tuple determine_python_object_type(PyObject* o /// @todo We will iterate over all arrays in a column in aggregator_set_data anyways, so this is redundant, however /// the type is determined at the point when obj_to_tensor is called. We need to make it possible to change the /// the column type in aggregator_set_data in order not to iterate all arrays twice. -std::tuple determine_python_array_type(PyObject** begin, PyObject** end) { +[[nodiscard]] static std::tuple determine_python_array_type(PyObject** begin, PyObject** end) { auto none = py::none{}; while(begin != end) { if(none.ptr() == *begin) { @@ -221,10 +221,7 @@ std::shared_ptr py_ndf_to_frame( "Number idx names {} and values {} do not match", idx_names.size(), idx_vals.size()); - if (idx_names.empty()) { - res->index = stream::RowCountIndex(); - res->desc.set_index_type(IndexDescriptor::ROWCOUNT); - } else { + if (!idx_names.empty()) { util::check(idx_names.size() == 1, "Multi-indexed dataframes not handled"); auto index_tensor = obj_to_tensor(idx_vals[0].ptr(), empty_types); util::check(index_tensor.ndim() == 1, "Multi-dimensional indexes not handled"); @@ -232,10 +229,7 @@ std::shared_ptr py_ndf_to_frame( std::string index_column_name = !idx_names.empty() ? idx_names[0] : "index"; res->num_rows = index_tensor.shape(0); // TODO handle string indexes - // Empty type check is added to preserve the current behavior which is that 0-rowed dataframes - // are assigned datetime index. This will be changed in further PR creating empty typed index. - if (index_tensor.data_type() == DataType::NANOSECONDS_UTC64 || is_empty_type(index_tensor.data_type())) { - + if (index_tensor.data_type() == DataType::NANOSECONDS_UTC64) { res->desc.set_index_field_count(1); res->desc.set_index_type(IndexDescriptor::TIMESTAMP); @@ -268,6 +262,21 @@ std::shared_ptr py_ndf_to_frame( res->field_tensors.push_back(std::move(tensor)); } + // idx_names are passed by the python layer. They are empty in case row count index is used see: + // https://github.com/man-group/ArcticDB/blob/4184a467d9eee90600ddcbf34d896c763e76f78f/python/arcticdb/version_store/_normalization.py#L291 + // Currently the python layers assign RowRange index to both empty dataframes and dataframes wich do not specify + // index explicitly. Thus we handle this case after all columns are read so that we know how many rows are there. + if (idx_names.empty()) { + if (res->num_rows > 0) { + res->index = stream::RowCountIndex(); + res->desc.set_index_type(IndexDescriptor::ROWCOUNT); + } else { + res->index = stream::EmptyIndex(); + res->desc.set_index_type(IndexDescriptor::EMPTY); + } + + } + ARCTICDB_DEBUG(log::version(), "Received frame with descriptor {}", res->desc); res->set_index_range(); return res; diff --git a/cpp/arcticdb/storage/single_file_storage.hpp b/cpp/arcticdb/storage/single_file_storage.hpp index 303106da0f..99a4838c29 100644 --- a/cpp/arcticdb/storage/single_file_storage.hpp +++ b/cpp/arcticdb/storage/single_file_storage.hpp @@ -63,7 +63,7 @@ struct formatter { template auto format(const arcticdb::storage::KeyData &k, FormatContext &ctx) const { - return format_to(ctx.out(), "{}:{}", k.key_offset_, k.key_size_); + return fmt::format_to(ctx.out(), "{}:{}", k.key_offset_, k.key_size_); } }; diff --git a/cpp/arcticdb/stream/aggregator.hpp b/cpp/arcticdb/stream/aggregator.hpp index 434b5fc98d..90d2f9753a 100644 --- a/cpp/arcticdb/stream/aggregator.hpp +++ b/cpp/arcticdb/stream/aggregator.hpp @@ -170,7 +170,9 @@ class Aggregator { // size then you get default buffer sizes. segment_(desc.value_or(schema_policy_.default_descriptor()), row_count.value_or(segmenting_policy_.expected_row_size()), false, SparsePolicy::allow_sparse) { segment_.init_column_map(); - index().check(segment_.descriptor().fields()); + if constexpr (!(std::is_same_v || std::is_same_v)) { + index().check(segment_.descriptor().fields()); + } }; virtual ~Aggregator() = default; diff --git a/cpp/arcticdb/stream/append_map.cpp b/cpp/arcticdb/stream/append_map.cpp index d971499cca..0ed792e691 100644 --- a/cpp/arcticdb/stream/append_map.cpp +++ b/cpp/arcticdb/stream/append_map.cpp @@ -214,7 +214,7 @@ folly::Future write_incomplete_frame( std::optional&& next_key) { using namespace arcticdb::pipelines; - if (!index_is_not_timeseries_or_is_sorted_ascending(frame)) { + if (!index_is_not_timeseries_or_is_sorted_ascending(*frame)) { sorting::raise("When writing/appending staged data in parallel, input data must be sorted."); } diff --git a/cpp/arcticdb/stream/index.hpp b/cpp/arcticdb/stream/index.hpp index 9edf08a60b..ddd881f04a 100644 --- a/cpp/arcticdb/stream/index.hpp +++ b/cpp/arcticdb/stream/index.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include @@ -260,10 +261,6 @@ class RowCountIndex : public BaseIndex { static constexpr IndexDescriptor::Type type() { return IndexDescriptor::ROWCOUNT; } - void check(const FieldCollection& ) const { - // No index defined - } - template static IndexValue start_value_for_segment(const SegmentType &segment) { return static_cast(segment.offset()); @@ -296,10 +293,47 @@ class RowCountIndex : public BaseIndex { static constexpr const char *name() { return "row_count"; } }; -using Index = std::variant; +class EmptyIndex : public BaseIndex { +public: + using TypeDescTag = TypeDescriptorTag, DimensionTag>; + static constexpr size_t field_count() { + return 0; + } + + static constexpr IndexDescriptor::Type type() { + return IndexDescriptor::EMPTY; + } + + static constexpr const char* name() { + return "empty"; + } + + static constexpr EmptyIndex default_index() { + return {}; + } + + [[nodiscard]] static IndexValue start_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); + } + + [[nodiscard]] static IndexValue end_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); + } + + [[nodiscard]] static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); + } + + [[nodiscard]] static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); + } +}; + +using Index = std::variant; inline Index index_type_from_descriptor(const StreamDescriptor &desc) { switch (desc.index().proto().kind()) { + case IndexDescriptor::EMPTY: return EmptyIndex{}; case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::make_from_descriptor(desc); case IndexDescriptor::STRING: @@ -312,6 +346,7 @@ inline Index index_type_from_descriptor(const StreamDescriptor &desc) { inline Index default_index_type_from_descriptor(const IndexDescriptor::Proto &desc) { switch (desc.kind()) { + case IndexDescriptor::EMPTY: return EmptyIndex{}; case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::default_index(); case IndexDescriptor::STRING: @@ -326,6 +361,7 @@ inline Index default_index_type_from_descriptor(const IndexDescriptor::Proto &de // Only to be used for visitation to get field count etc as the name is not set inline Index variant_index_from_type(IndexDescriptor::Type type) { switch (type) { + case IndexDescriptor::EMPTY: return EmptyIndex{}; case IndexDescriptor::TIMESTAMP: return TimeseriesIndex{TimeseriesIndex::DefaultName}; case IndexDescriptor::STRING: diff --git a/cpp/arcticdb/stream/protobuf_mappings.hpp b/cpp/arcticdb/stream/protobuf_mappings.hpp index 9fcb0fb567..a47df433c7 100644 --- a/cpp/arcticdb/stream/protobuf_mappings.hpp +++ b/cpp/arcticdb/stream/protobuf_mappings.hpp @@ -36,7 +36,7 @@ inline arcticdb::proto::descriptors::NormalizationMetadata make_rowcount_norm_me auto id = std::get(stream_id); pandas.mutable_common()->set_name(std::move(id)); NormalizationMetadata_PandasIndex pandas_index; - pandas_index.set_is_not_range_index(true); + pandas_index.set_is_physically_stored(true); pandas.mutable_common()->mutable_index()->CopyFrom(pandas_index); norm_meta.mutable_df()->CopyFrom(pandas); return norm_meta; diff --git a/cpp/arcticdb/stream/row_builder.hpp b/cpp/arcticdb/stream/row_builder.hpp index c0c79f44c0..a95b01e053 100644 --- a/cpp/arcticdb/stream/row_builder.hpp +++ b/cpp/arcticdb/stream/row_builder.hpp @@ -53,7 +53,7 @@ class RowBuilder { template void start_row(const Args...args) { reset(); - if constexpr(sizeof...(Args)> 0) { + if constexpr(sizeof...(Args)> 0 && !std::is_same_v) { index().set([&](std::size_t pos, auto arg) { if constexpr (std::is_integral_v || std::is_floating_point_v) set_scalar_impl(pos, arg); diff --git a/cpp/arcticdb/stream/stream_utils.hpp b/cpp/arcticdb/stream/stream_utils.hpp index 95b15a8f5b..66927997e0 100644 --- a/cpp/arcticdb/stream/stream_utils.hpp +++ b/cpp/arcticdb/stream/stream_utils.hpp @@ -376,7 +376,7 @@ inline std::vector get_index_columns_from_descriptor(const Timeseri ssize_t index_till; const auto& common = norm_info.df().common(); if(auto idx_type = common.index_type_case(); idx_type == arcticdb::proto::descriptors::NormalizationMetadata_Pandas::kIndex) - index_till = common.index().is_not_range_index() ? 1 : stream_descriptor.index().field_count(); + index_till = common.index().is_physically_stored() ? 1 : stream_descriptor.index().field_count(); else index_till = 1 + common.multi_index().field_count(); //# The value of field_count is len(index) - 1 @@ -388,7 +388,10 @@ inline std::vector get_index_columns_from_descriptor(const Timeseri } inline IndexRange get_range_from_segment(const Index& index, const SegmentInMemory& segment) { - return util::variant_match(index, [&segment] (auto index_type) { + return util::variant_match( + index, + [](const EmptyIndex&) { return IndexRange{}; }, + [&segment] (auto index_type) { using IndexType = decltype(index_type); auto start = IndexType::start_value_for_segment(segment); auto end = IndexType::end_value_for_segment(segment); diff --git a/cpp/arcticdb/version/schema_checks.hpp b/cpp/arcticdb/version/schema_checks.hpp index 526d9b7fb7..004bd14b3a 100644 --- a/cpp/arcticdb/version/schema_checks.hpp +++ b/cpp/arcticdb/version/schema_checks.hpp @@ -29,42 +29,55 @@ struct StreamDescriptorMismatch : ArcticSpecificException(frame.index); - + const IndexDescriptor::Type old_idx_kind = old_descriptor.index().type(); + const IndexDescriptor::Type new_idx_kind = frame.desc.index().type(); if (operation == UPDATE) { - util::check_rte(old_idx_kind == IndexDescriptor::TIMESTAMP && new_is_timeseries, + const bool new_is_timeseries = std::holds_alternative(frame.index); + util::check_rte( + (old_idx_kind == IndexDescriptor::TIMESTAMP || old_idx_kind == IndexDescriptor::EMPTY) && new_is_timeseries, "Update will not work as expected with a non-timeseries index"); } else { - // TODO: AN-722 - if (new_is_timeseries) { - if (old_idx_kind != IndexDescriptor::TIMESTAMP) { - log::version().warn("Appending a timeseries to a non-timeseries-indexed symbol may create a " - "confusing index and cause problems later"); - } - } else { - if (old_idx_kind != IndexDescriptor::ROWCOUNT) { - // Backwards compatibility - log::version().warn("Appending a non-timeseries-indexed data to a timeseries symbol is highly " - "likely to cause corruption/unexpected behaviour."); - } - } + const IndexDescriptor::Type common_index_type = get_common_index_type(old_idx_kind, new_idx_kind); + normalization::check( + common_index_type != IndexDescriptor::UNKNOWN, + "Cannot append {} index to {} index", + index_type_to_str(new_idx_kind), + index_type_to_str(old_idx_kind) + ); } } -inline bool columns_match(const StreamDescriptor &left, const StreamDescriptor &right) { - if (left.fields().size() != right.fields().size()) +inline bool columns_match(const StreamDescriptor& df_in_store_descriptor, const StreamDescriptor& new_df_descriptor) { + const int right_fields_offset = df_in_store_descriptor.index().type() == IndexDescriptor::EMPTY ? new_df_descriptor.index().field_count() : 0; + // The empty index is compatible with all other index types. Differences in the index fields in this case is + // allowed. The index fields are always the first in the list. + if (df_in_store_descriptor.fields().size() + right_fields_offset != new_df_descriptor.fields().size()) { return false; - - for (auto i = 0; i < int(left.fields().size()); ++i) { - if (left.fields(i).name() != right.fields(i).name()) + } + // In case the left index is empty index we want to skip name/type checking of the index fields which are always + // the first fields. + for (auto i = 0; i < int(df_in_store_descriptor.fields().size()); ++i) { + if (df_in_store_descriptor.fields(i).name() != new_df_descriptor.fields(i + right_fields_offset).name()) return false; - const TypeDescriptor &left_type = left.fields(i).type(); - const TypeDescriptor &right_type = right.fields(i).type(); + const TypeDescriptor& left_type = df_in_store_descriptor.fields(i).type(); + const TypeDescriptor& right_type = new_df_descriptor.fields(i + right_fields_offset).type(); if (!trivially_compatible_types(left_type, right_type) && !(is_empty_type(left_type.data_type()) || is_empty_type(right_type.data_type()))) diff --git a/cpp/arcticdb/version/version_core-inl.hpp b/cpp/arcticdb/version/version_core-inl.hpp index 1dfee5617d..501a88263e 100644 --- a/cpp/arcticdb/version/version_core-inl.hpp +++ b/cpp/arcticdb/version/version_core-inl.hpp @@ -64,10 +64,11 @@ void merge_frames_for_keys_impl( if (std::holds_alternative(query.row_filter)) index_range = std::get(query.row_filter); - auto compare = - [=](const std::unique_ptr &left, const std::unique_ptr &right) { - return pipelines::index::index_value_from_row(left->row(), IndexDescriptor::TIMESTAMP, 0) > pipelines::index::index_value_from_row(right->row(), IndexDescriptor::TIMESTAMP, 0); - }; + auto compare = [](const std::unique_ptr& left, + const std::unique_ptr& right) { + return pipelines::index::index_value_from_row(left->row(), IndexDescriptor::TIMESTAMP, 0) > + pipelines::index::index_value_from_row(right->row(), IndexDescriptor::TIMESTAMP, 0); + }; movable_priority_queue, std::vector>, decltype(compare)> input_streams{compare}; diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 35e954fbe3..5f3f3a56ae 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -102,7 +102,7 @@ folly::Future async_write_dataframe_impl( frame->set_bucketize_dynamic(options.bucketize_dynamic); auto slicing_arg = get_slicing_policy(options, *frame); auto partial_key = IndexPartialKey{frame->desc.id(), version_id}; - if (validate_index && !index_is_not_timeseries_or_is_sorted_ascending(frame)) { + if (validate_index && !index_is_not_timeseries_or_is_sorted_ascending(*frame)) { sorting::raise("When calling write with validate_index enabled, input data must be sorted"); } return write_frame(std::move(partial_key), frame, slicing_arg, store, de_dup_map, sparsify_floats); @@ -111,7 +111,8 @@ folly::Future async_write_dataframe_impl( namespace { IndexDescriptor::Proto check_index_match(const arcticdb::stream::Index& index, const IndexDescriptor::Proto& desc) { if (std::holds_alternative(index)) - util::check(desc.kind() == IndexDescriptor::TIMESTAMP, + util::check( + desc.kind() == IndexDescriptor::TIMESTAMP || desc.kind() == IndexDescriptor::EMPTY, "Index mismatch, cannot update a non-timeseries-indexed frame with a timeseries"); else util::check(desc.kind() == IndexDescriptor::ROWCOUNT, @@ -121,12 +122,12 @@ IndexDescriptor::Proto check_index_match(const arcticdb::stream::Index& index, c } } -void sorted_data_check_append(const std::shared_ptr& frame, index::IndexSegmentReader& index_segment_reader){ +void sorted_data_check_append(const InputTensorFrame& frame, index::IndexSegmentReader& index_segment_reader){ if (!index_is_not_timeseries_or_is_sorted_ascending(frame)) { sorting::raise("When calling append with validate_index enabled, input data must be sorted"); } sorting::check( - !std::holds_alternative(frame->index) || + !std::holds_alternative(frame.index) || index_segment_reader.mutable_tsd().mutable_proto().stream_descriptor().sorted() == arcticdb::proto::descriptors::SortedValue::ASCENDING, "When calling append with validate_index enabled, the existing data must be sorted"); } @@ -145,11 +146,11 @@ folly::Future async_append_impl( bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); auto row_offset = index_segment_reader.tsd().proto().total_rows(); util::check_rte(!index_segment_reader.is_pickled(), "Cannot append to pickled data"); - if (validate_index) { - sorted_data_check_append(frame, index_segment_reader); - } frame->set_offset(static_cast(row_offset)); fix_descriptor_mismatch_or_throw(APPEND, options.dynamic_schema, index_segment_reader, *frame); + if (validate_index) { + sorted_data_check_append(*frame, index_segment_reader); + } frame->set_bucketize_dynamic(bucketize_dynamic); auto slicing_arg = get_slicing_policy(options, *frame); @@ -329,7 +330,10 @@ VersionedItem update_impl( auto index_segment_reader = index::get_index_reader(*(update_info.previous_index_key_), store); util::check_rte(!index_segment_reader.is_pickled(), "Cannot update pickled data"); auto index_desc = check_index_match(frame->index, index_segment_reader.tsd().proto().stream_descriptor().index()); - util::check(index_desc.kind() == IndexDescriptor::TIMESTAMP, "Update not supported for non-timeseries indexes"); + util::check( + index_desc.kind() == IndexDescriptor::TIMESTAMP || index_desc.kind() == IndexDescriptor::EMPTY, + "Update not supported for non-timeseries indexes" + ); sorted_data_check_update(*frame, index_segment_reader); bool bucketize_dynamic = index_segment_reader.bucketize_dynamic(); (void)check_and_mark_slices(index_segment_reader, dynamic_schema, false, std::nullopt, bucketize_dynamic); @@ -538,7 +542,7 @@ void set_output_descriptors( auto mutable_index = pipeline_context->norm_meta_->mutable_df()->mutable_common()->mutable_index(); mutable_index->set_name(*new_index); mutable_index->clear_fake_name(); - mutable_index->set_is_not_range_index(true); + mutable_index->set_is_physically_stored(true); break; } } diff --git a/cpp/proto/arcticc/pb2/descriptors.proto b/cpp/proto/arcticc/pb2/descriptors.proto index c1cf5a49dc..420e040734 100644 --- a/cpp/proto/arcticc/pb2/descriptors.proto +++ b/cpp/proto/arcticc/pb2/descriptors.proto @@ -57,6 +57,8 @@ message TypeDescriptor { message IndexDescriptor { enum Type { UNKNOWN = 0; + // Used then the dataframe has 0 rows. Convertible to any other type + EMPTY = 69; // 'E' ROWCOUNT = 82; // 'R' STRING = 83; // 'S' TIMESTAMP = 84; // 'T' @@ -122,7 +124,7 @@ message NormalizationMetadata { string name = 1; // RangeIndex are not represented as a field, hence no name is associated string tz = 2; bool fake_name = 3; - bool is_not_range_index = 4; + bool is_physically_stored = 4; int64 start = 5; // Used for RangeIndex int64 step = 6; // Used for RangeIndex bool is_int = 7; diff --git a/python/arcticdb/util/hypothesis.py b/python/arcticdb/util/hypothesis.py index 321f895b0b..680dcac26e 100644 --- a/python/arcticdb/util/hypothesis.py +++ b/python/arcticdb/util/hypothesis.py @@ -197,7 +197,7 @@ class InputFactories(_InputFactoryValues, Enum): ), _ROWCOUNT, "df", - "is_not_range_index", + "is_physically_stored", ) DF_DTI = ( diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index f51dbaabc5..a2dfb73f82 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -292,9 +292,7 @@ def _from_tz_timestamp(ts, tz): def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None): # index: pd.Index or np.ndarray -> np.ndarray index_tz = None - - if isinstance(index, RangeIndex): - # skip index since we can reconstruct it, so no need to actually store it + if isinstance(index_norm, NormalizationMetadata.PandasIndex) and not index_norm.is_physically_stored: if index.name: if not isinstance(index.name, int) and not isinstance(index.name, str): raise NormalizationException( @@ -303,8 +301,10 @@ def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None if isinstance(index.name, int): index_norm.is_int = True index_norm.name = str(index.name) - index_norm.start = index.start if _range_index_props_are_public else index._start - index_norm.step = index.step if _range_index_props_are_public else index._step + if isinstance(index, RangeIndex): + # skip index since we can reconstruct it, so no need to actually store it + index_norm.start = index.start if _range_index_props_are_public else index._start + index_norm.step = index.step if _range_index_props_are_public else index._step return [], [] else: coerce_type = DTN64_DTYPE if len(index) == 0 else None @@ -363,16 +363,16 @@ def _denormalize_single_index(item, norm_meta): rtn = Index([]) if len(item.index_columns) == 0: # when then initial index was a RangeIndex - if norm_meta.WhichOneof("index_type") == "index" and not norm_meta.index.is_not_range_index: + if norm_meta.WhichOneof("index_type") == "index" and not norm_meta.index.is_physically_stored: if len(item.data) > 0: if hasattr(norm_meta.index, "step") and norm_meta.index.step != 0: stop = norm_meta.index.start + norm_meta.index.step * len(item.data[0]) name = norm_meta.index.name if norm_meta.index.name else None return RangeIndex(start=norm_meta.index.start, stop=stop, step=norm_meta.index.step, name=name) else: - return None + return Index([]) else: - return RangeIndex(start=0, stop=0, step=1) + return Index([]) # this means that the index is not a datetime index and it's been represented as a regular field in the stream item.index_columns.append(item.names.pop(0)) @@ -525,7 +525,11 @@ def denormalize(self, item, norm_meta): class _PandasNormalizer(Normalizer): def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): index = df.index - if isinstance(index, MultiIndex): + if len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0: + index_norm = pd_norm.index + index_norm.is_physically_stored = False + index = Index([]) + elif isinstance(index, MultiIndex): # This is suboptimal and only a first implementation since it reduplicates the data index_norm = pd_norm.multi_index index_norm.field_count = len(index.levels) - 1 @@ -546,22 +550,12 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len): df.reset_index(fields, inplace=True) index = df.index else: - n_rows = len(index) - n_categorical_columns = len(df.select_dtypes(include="category").columns) - if IS_PANDAS_TWO and isinstance(index, RangeIndex) and n_rows == 0 and n_categorical_columns == 0: - # In Pandas 1.0, an Index is used by default for any empty dataframe or series, except if - # there are categorical columns in which case a RangeIndex is used. - # - # In Pandas 2.0, RangeIndex is used by default for _any_ empty dataframe or series. - # See: https://github.com/pandas-dev/pandas/issues/49572 - # Yet internally, ArcticDB uses a DatetimeIndex for empty dataframes and series without categorical - # columns. - # - # The index is converted to a DatetimeIndex for preserving the behavior of ArcticDB with Pandas 1.0. - index = DatetimeIndex([]) - + is_not_range_index = not isinstance(index, RangeIndex) + df_has_rows = not(len(index) == 0 and len(df.select_dtypes(include="category").columns) == 0) index_norm = pd_norm.index - index_norm.is_not_range_index = not isinstance(index, RangeIndex) + index_norm.is_physically_stored = is_not_range_index and df_has_rows + if not df_has_rows: + index = Index([]) return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len) @@ -853,7 +847,6 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col # type: (DataFrame, Optional[int])->NormalizedInput norm_meta = NormalizationMetadata() norm_meta.df.common.mark = True - if isinstance(item.columns, RangeIndex): norm_meta.df.has_synthetic_columns = True @@ -1106,6 +1099,7 @@ def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_col norm_meta = NormalizationMetadata() norm_meta.ts.mark = True index_norm = norm_meta.ts.common.index + index_norm.is_physically_stored = len(item.times) > 0 and not isinstance(item.times, RangeIndex) index_names, ix_vals = _normalize_single_index( item.times, ["times"], index_norm, dynamic_strings, string_max_len ) diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index 96144cc407..68abc3571e 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -1934,7 +1934,7 @@ def _get_index_columns_from_descriptor(descriptor): # is 0. idx_type = norm_info.df.common.WhichOneof("index_type") if idx_type == "index": - last_index_column_idx = 1 if norm_info.df.common.index.is_not_range_index else 0 + last_index_column_idx = 1 if norm_info.df.common.index.is_physically_stored else 0 else: # The value of field_count is len(index) - 1 last_index_column_idx = 1 + norm_info.df.common.multi_index.field_count @@ -2547,7 +2547,7 @@ def _process_info(self, symbol: str, dit, as_of: Optional[VersionQueryInput] = N else: index_metadata = timeseries_descriptor.normalization.df.common.multi_index - if index_type == "multi_index" or (index_type == "index" and index_metadata.is_not_range_index): + if index_type == "multi_index" or (index_type == "index" and index_metadata.is_physically_stored): index_name_from_store = columns.pop(0) has_fake_name = ( 0 in index_metadata.fake_field_pos if index_type == "multi_index" else index_metadata.fake_name diff --git a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py index 58964763fa..b0311cdbdd 100644 --- a/python/tests/unit/arcticdb/version_store/test_empty_column_type.py +++ b/python/tests/unit/arcticdb/version_store/test_empty_column_type.py @@ -14,18 +14,18 @@ class DtypeGenerator: """ - Class which can generate all dtypes which ArcticDB supports. It can generate them by type category e.g. int, float, - etc. It can also generate a list of all available dtypes + Can generate representative subset of all supported dtypes. Can generate by category (e.g. int, float, etc...) or + all. Generating the full set of dtypes leads to combinatoric explosion in the number of test cases. """ @staticmethod def int_dtype(): - return [t + s for s in ["8", "16", "32", "64"] for t in ["int", "uint"]] + return ["int32", "uint64"] @staticmethod def float_dtype(): - return ["float" + s for s in ["32", "64"]] + return ["float64"] @staticmethod def bool_dtype(): @@ -69,7 +69,7 @@ def dtype(request): yield request.param -@pytest.fixture(params=[pd.RangeIndex(0,0), pd.DatetimeIndex([])]) +@pytest.fixture(params=[pd.RangeIndex(0,0), pd.DatetimeIndex([]), pd.MultiIndex.from_arrays([[],[]], names=["a", "b"])]) def empty_index(request): yield request.param @@ -498,11 +498,11 @@ def test_float(self, lmdb_version_store_static_and_dynamic, float_dtype): ) lmdb_version_store_static_and_dynamic.update( "sym", - pd.DataFrame({"col": [None, np.nan]}, index=self.update_index(), dtype=float_dtype) + pd.DataFrame({"col": [None, np.nan]}, index=self.update_index()) ) assert_frame_equal( lmdb_version_store_static_and_dynamic.read("sym").data, - pd.DataFrame({"col": [1, np.nan, np.nan, 4]}, index=self.index(), dtype=float_dtype) + pd.DataFrame({"col": [1, float("NaN"), np.nan, 4]}, index=self.index(), dtype=float_dtype) ) def test_bool(self, lmdb_version_store_static_and_dynamic, boolean_dtype): @@ -569,6 +569,7 @@ def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype): ) ) + class TestCanAppendToEmptyColumn: """ Tests that it's possible to append to a column which contains no rows. The type of the columns, including the index @@ -576,55 +577,41 @@ class TestCanAppendToEmptyColumn: """ - @pytest.fixture(params= - [ - pytest.param( - pd.RangeIndex(0,3), - marks=pytest.mark.xfail( - reason="Appending row ranged columns to empty columns is not supported yet." - "The index of empty df is of type datetime and it clashes with the row-range type." - "The expected behavior is to allow this as long as the initial df is of 0 rows.") - ), - list(pd.date_range(start="1/1/2024", end="1/3/2024")) - ] - ) + @pytest.fixture(params=[pd.RangeIndex(0,3), list(pd.date_range(start="1/1/2024", end="1/3/2024"))]) def append_index(self, request): yield request.param @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): - if isinstance(empty_index, pd.RangeIndex) and sys.version_info[1] < 9: - pytest.xfail("""compat-36 and compat-38 tests are failing because this would assign a row-range index to - the empty df. This will be fixed when the pandas-agnostic empty index type is added""") lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) yield - def test_integer(self, lmdb_version_store_static_and_dynamic, int_dtype, empty_index, dtype, append_index): + def test_integer(self, lmdb_version_store_static_and_dynamic, int_dtype, dtype, append_index): df_to_append = pd.DataFrame({"col": [1,2,3]}, dtype=int_dtype, index=append_index) lmdb_version_store_static_and_dynamic.append("sym", df_to_append) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) - def test_float(self, lmdb_version_store_static_and_dynamic, float_dtype, empty_index, append_index): + def test_float(self, lmdb_version_store_static_and_dynamic, float_dtype, append_index): df_to_append = pd.DataFrame({"col": [1.0,2.0,3.0]}, dtype=float_dtype, index=append_index) lmdb_version_store_static_and_dynamic.append("sym", df_to_append) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) - def test_bool(self, lmdb_version_store_static_and_dynamic, boolean_dtype, empty_index, append_index): + def test_bool(self, lmdb_version_store_static_and_dynamic, boolean_dtype, append_index): df_to_append = pd.DataFrame({"col": [True, False, None]}, dtype=boolean_dtype, index=append_index) lmdb_version_store_static_and_dynamic.append("sym", df_to_append) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) - def test_empty(self, lmdb_version_store_static_and_dynamic, empty_index, append_index): + def test_nones(self, lmdb_version_store_static_and_dynamic, append_index): df_to_append = pd.DataFrame({"col": [None, None, None]}, index=append_index) lmdb_version_store_static_and_dynamic.append("sym", df_to_append) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) - def test_string(self, lmdb_version_store_static_and_dynamic, empty_index, append_index): + def test_string(self, lmdb_version_store_static_and_dynamic, append_index): df_to_append = pd.DataFrame({"col": ["short_string", None, 20 * "long_string"]}, index=append_index) lmdb_version_store_static_and_dynamic.append("sym", df_to_append) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) - def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, empty_index, append_index): + def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, append_index): df_to_append = pd.DataFrame( { "col": np.array( @@ -642,58 +629,81 @@ def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, empty_ind assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df_to_append) -class TestAppendingEmptyToColumnDoesNothing: +class TestAppendAndUpdateWithEmptyToColumnDoesNothing: """ - Test if it is possible to append empty column to an already existing column. The append should not change anything. - If dynamic schema is used the update should not create new columns. The update should not even reach the C++ layer. + Test if it is possible to append/update empty column to an already existing column. The append/update should not + change anything. If dynamic schema is used the append/update should not create new columns. The append/update + should not even reach the C++ layer and the version should not be changed. """ - @pytest.fixture(params=[pd.RangeIndex(0,3), list(pd.date_range(start="1/1/2024", end="1/3/2024"))]) def index(self, request): yield request.param - def test_integer(self, lmdb_version_store_static_and_dynamic, index, int_dtype, empty_index, dtype): + @pytest.fixture() + def empty_dataframe(self, empty_index, dtype): + yield pd.DataFrame({"col": []}, dtype=dtype, index=empty_index) + + @staticmethod + def assert_append_empty_does_nothing(initial_df, store, empty): + store.append("sym", empty) + read_result = store.read("sym") + assert_frame_equal(read_result.data, initial_df) + assert read_result.version == 0 + + @staticmethod + def assert_update_empty_does_nothing(initial_df, store, empty): + store.update("sym", empty) + read_result = store.read("sym") + assert_frame_equal(read_result.data, initial_df) + assert read_result.version == 0 + + def test_integer(self, lmdb_version_store_static_and_dynamic, index, int_dtype, empty_dataframe): df = pd.DataFrame({"col": [1,2,3]}, dtype=int_dtype, index=index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) - def test_float(self, lmdb_version_store_static_and_dynamic, index, float_dtype, empty_index, dtype): + def test_float(self, lmdb_version_store_static_and_dynamic, index, float_dtype, empty_dataframe): df = pd.DataFrame({"col": [1,2,3]}, dtype=float_dtype, index=index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) - def test_bool(self, lmdb_version_store_static_and_dynamic, index, boolean_dtype, empty_index, dtype): + def test_bool(self, lmdb_version_store_static_and_dynamic, index, boolean_dtype, empty_dataframe): df = pd.DataFrame({"col": [False, True, None]}, dtype=boolean_dtype, index=index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) - def test_empty(self, lmdb_version_store_static_and_dynamic, index, empty_index, dtype): + def test_nones(self, lmdb_version_store_static_and_dynamic, index, empty_dataframe): df = pd.DataFrame({"col": [None, None, None]}, index=index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) - def test_string(self, lmdb_version_store_static_and_dynamic, index, empty_index, dtype): - df = pd.DataFrame({"col": ["shord", 20*"long", None]}, index=index) + @pytest.mark.parametrize("initial_empty_index", [pd.RangeIndex(0,0), pd.DatetimeIndex([])]) + def test_empty(self, lmdb_version_store_static_and_dynamic, initial_empty_index, empty_dataframe): + df = pd.DataFrame({"col": []}, index=initial_empty_index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing( + lmdb_version_store_static_and_dynamic.read("sym").data, + lmdb_version_store_static_and_dynamic, + empty_dataframe + ) + self.assert_update_empty_does_nothing( + lmdb_version_store_static_and_dynamic.read("sym").data, + lmdb_version_store_static_and_dynamic, + empty_dataframe + ) + + def test_string(self, lmdb_version_store_static_and_dynamic, index, empty_dataframe): + df = pd.DataFrame({"col": ["short", 20*"long", None]}, index=index) + lmdb_version_store_static_and_dynamic.write("sym", df) + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) - def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, index, empty_index, dtype): + def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, index, empty_dataframe): df = pd.DataFrame( { "col": np.array( @@ -705,10 +715,8 @@ def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype, index, em ) }, dtype=date_dtype, index=index) lmdb_version_store_static_and_dynamic.write("sym", df) - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) - read_result = lmdb_version_store_static_and_dynamic.read("sym") - assert_frame_equal(read_result.data, df) - assert read_result.version == 0 + self.assert_append_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) + self.assert_update_empty_does_nothing(df, lmdb_version_store_static_and_dynamic, empty_dataframe) def test_empty_df_does_not_create_new_columns_in_dynamic_schema(self, lmdb_version_store_dynamic_schema, index): df = pd.DataFrame({"col": [1,2,3]}, dtype="int32", index=index) @@ -735,9 +743,6 @@ class TestCanUpdateEmptyColumn: @pytest.fixture(autouse=True) def create_empty_column(self, lmdb_version_store_static_and_dynamic, dtype, empty_index): - if isinstance(empty_index, pd.RangeIndex) and sys.version_info[1] < 9: - pytest.xfail("""compat-36 and compat-38 tests are failing because this would assign a row-range index to - the empty df. This will be fixed when the pandas-agnostic empty index type is added""") lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []}, dtype=dtype, index=empty_index)) yield @@ -784,10 +789,31 @@ def test_date(self, lmdb_version_store_static_and_dynamic, date_dtype): lmdb_version_store_static_and_dynamic.update("sym", df) assert_frame_equal(lmdb_version_store_static_and_dynamic.read("sym").data, df) -class TestEmptyTypesIsOverriden: + +class TestEmptyTypeIsOverriden: + """ + When an empty column (or a column containing only None) values is initially written it is assigned the empty type. + The first write/update to change the type determines the actual type of the column. Test that the first non-empty + append determines the actual type and subsequent appends with different types fail. + """ + + def test_cannot_append_different_type_after_first_not_none(self, lmdb_version_store_static_and_dynamic): lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": [None, None]})) lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [1, 2, 3]})) lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [None, None]})) with pytest.raises(Exception): - lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": ["some", "string"]})) \ No newline at end of file + lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": ["some", "string"]})) + + @pytest.mark.parametrize( + "incompatible_indexes", + [ + (pd.RangeIndex(0,3), list(pd.date_range(start="1/1/2024", end="1/3/2024"))), + (list(pd.date_range(start="1/1/2024", end="1/3/2024")), pd.RangeIndex(0, 3)) + ] + ) + def test_cannot_append_different_index_type_after_first_non_empty(self, lmdb_version_store_static_and_dynamic, incompatible_indexes): + lmdb_version_store_static_and_dynamic.write("sym", pd.DataFrame({"col": []})) + lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [1,2,3]}, index=incompatible_indexes[0])) + with pytest.raises(Exception): + lmdb_version_store_static_and_dynamic.append("sym", pd.DataFrame({"col": [4, 5, 6]}, index=incompatible_indexes[1])) \ No newline at end of file