diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index fc34d7c0d1..e631998773 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -461,6 +461,7 @@ set(arcticdb_srcs storage/storage_factory.cpp stream/aggregator.cpp stream/append_map.cpp + stream/index.cpp stream/piloted_clock.cpp toolbox/library_tool.cpp util/allocator.cpp diff --git a/cpp/arcticdb/stream/index.cpp b/cpp/arcticdb/stream/index.cpp new file mode 100644 index 0000000000..7c97fe259d --- /dev/null +++ b/cpp/arcticdb/stream/index.cpp @@ -0,0 +1,252 @@ +/* Copyright 2024 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software + * will be governed by the Apache License, version 2.0. + */ + +#include +#include +#include +#include + + +namespace arcticdb::stream { + +IndexDescriptor::Type get_index_value_type(const AtomKey& key) { + return std::holds_alternative(key.start_index()) ? IndexDescriptor::TIMESTAMP + : IndexDescriptor::STRING; + } + +template + StreamDescriptor BaseIndex::create_stream_descriptor( + StreamId stream_id, + std::initializer_list fields +) const { + std::vector fds{fields}; + return create_stream_descriptor(stream_id, folly::range(fds)); +} + +template const Derived* BaseIndex::derived() const { + return static_cast(this); +} + +template BaseIndex::operator IndexDescriptor() const { + return {Derived::field_count(), Derived::type()}; +} + +template FieldRef BaseIndex::field(size_t) const { + return {static_cast(typename Derived::TypeDescTag{}), std::string_view(derived()->name())}; +} + +TimeseriesIndex::TimeseriesIndex(const std::string& name) : name_(name) {} + +TimeseriesIndex TimeseriesIndex::default_index() { + return TimeseriesIndex(DefaultName); +} + +void TimeseriesIndex::check(const FieldCollection& fields) const { + const size_t fields_size = fields.size(); + constexpr int current_fields_size = int(field_count()); + + const TypeDescriptor& first_field_type = fields[0].type(); + const TypeDescriptor& current_first_field_type = this->field(0).type(); + + const bool valid_type_promotion = has_valid_type_promotion(first_field_type, current_first_field_type).has_value(); + const bool trivial_type_compatibility = trivially_compatible_types(first_field_type, current_first_field_type); + + const bool compatible_types = valid_type_promotion || trivial_type_compatibility; + + util::check_arg( + fields_size >= current_fields_size, + "expected at least {} fields, actual {}", + current_fields_size, + fields_size + ); + util::check_arg(compatible_types, "expected field[0]={}, actual {}", this->field(0), fields[0]); +} + +IndexValue TimeseriesIndex::start_value_for_segment(const SegmentInMemory& segment) { + if (segment.row_count() == 0) + return {NumericIndex{0}}; + auto first_ts = segment.template scalar_at(0, 0).value(); + return {first_ts}; +} + +IndexValue TimeseriesIndex::end_value_for_segment(const SegmentInMemory& segment) { + auto row_count = segment.row_count(); + if (row_count == 0) + return {NumericIndex{0}}; + auto last_ts = segment.template scalar_at(row_count - 1, 0).value(); + return {last_ts}; +} + +IndexValue TimeseriesIndex::start_value_for_keys_segment(const SegmentInMemory& segment) { + if (segment.row_count() == 0) + return {NumericIndex{0}}; + auto start_index_id = int(pipelines::index::Fields::start_index); + auto first_ts = segment.template scalar_at(0, start_index_id).value(); + return {first_ts}; +} + +IndexValue TimeseriesIndex::end_value_for_keys_segment(const SegmentInMemory& segment) { + auto row_count = segment.row_count(); + if (row_count == 0) + return {NumericIndex{0}}; + auto end_index_id = int(pipelines::index::Fields::end_index); + auto last_ts = segment.template scalar_at(row_count - 1, end_index_id).value(); + return {last_ts}; +} + +const char* TimeseriesIndex::name() const { + return name_.c_str(); +} + +TimeseriesIndex TimeseriesIndex::make_from_descriptor(const StreamDescriptor& desc) { + if (desc.field_count() > 0) + return TimeseriesIndex(std::string(desc.fields(0).name())); + + return TimeseriesIndex(DefaultName); +} + + +TableIndex::TableIndex(const std::string& name) : name_(name) { +} + +TableIndex TableIndex::default_index() { + return TableIndex(DefaultName); +} + +void TableIndex::check(const FieldCollection& fields) const { + util::check_arg( + fields.size() >= int(field_count()), + "expected at least {} fields, actual {}", + field_count(), + fields.size() + ); + + util::check(fields.ref_at(0) == field(0), "Field descriptor mismatch {} != {}", fields.ref_at(0), field(0)); +} + +IndexValue TableIndex::start_value_for_segment(const SegmentInMemory& segment) { + auto string_index = segment.string_at(0, 0).value(); + return {std::string{string_index}}; +} + +IndexValue TableIndex::end_value_for_segment(const SegmentInMemory& segment) { + auto last_rowid = segment.row_count() - 1; + auto string_index = segment.string_at(last_rowid, 0).value(); + return {std::string{string_index}}; +} + +IndexValue TableIndex::start_value_for_keys_segment(const SegmentInMemory& segment) { + if (segment.row_count() == 0) + return {NumericIndex{0}}; + auto start_index_id = int(pipelines::index::Fields::start_index); + auto string_index = segment.string_at(0, start_index_id).value(); + return {std::string{string_index}}; +} + +IndexValue TableIndex::end_value_for_keys_segment(const SegmentInMemory& segment) { + auto row_count = segment.row_count(); + if (row_count == 0) + return {NumericIndex{0}}; + auto end_index_id = int(pipelines::index::Fields::end_index); + auto string_index = segment.string_at(row_count - 1, end_index_id).value(); + return {std::string{string_index}}; +} + +TableIndex TableIndex::make_from_descriptor(const StreamDescriptor& desc) { + if (desc.field_count() > 0) + return TableIndex(std::string(desc.field(0).name())); + + return TableIndex(DefaultName); +} + +const char* TableIndex::name() const { + return name_.c_str(); +} + +RowCountIndex RowCountIndex::default_index() { + return RowCountIndex{}; +} + + +IndexValue RowCountIndex::start_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +IndexValue RowCountIndex::end_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset() + (segment.row_count() - 1)); +} + +IndexValue RowCountIndex::start_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +IndexValue RowCountIndex::end_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset() + (segment.row_count() - 1)); +} + +RowCountIndex RowCountIndex::make_from_descriptor(const StreamDescriptor&) const { + return RowCountIndex::default_index(); +} + +IndexValue EmptyIndex::start_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +IndexValue EmptyIndex::end_value_for_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +IndexValue EmptyIndex::start_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +IndexValue EmptyIndex::end_value_for_keys_segment(const SegmentInMemory& segment) { + return static_cast(segment.offset()); +} + +Index index_type_from_descriptor(const StreamDescriptor& desc) { + switch (desc.index().proto().kind()) { + case IndexDescriptor::EMPTY: return EmptyIndex{}; + case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::make_from_descriptor(desc); + case IndexDescriptor::STRING: return TableIndex::make_from_descriptor(desc); + case IndexDescriptor::ROWCOUNT: return RowCountIndex{}; + default: + util::raise_rte( + "Data obtained from storage refers to an index type that this build of ArcticDB doesn't understand ({}).", + int(desc.index().proto().kind()) + ); + } +} + +Index default_index_type_from_descriptor(const IndexDescriptor::Proto& desc) { + switch (desc.kind()) { + case IndexDescriptor::EMPTY: return EmptyIndex{}; + case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::default_index(); + case IndexDescriptor::STRING: return TableIndex::default_index(); + case IndexDescriptor::ROWCOUNT: return RowCountIndex::default_index(); + default: util::raise_rte("Unknown index type {} trying to generate index type", int(desc.kind())); + } +} + +Index default_index_type_from_descriptor(const IndexDescriptor& desc) { + return default_index_type_from_descriptor(desc.proto()); +} + +IndexDescriptor get_descriptor_from_index(const Index& index) { + return util::variant_match(index, [](const auto& idx) { return static_cast(idx); }); +} + +Index empty_index() { + return RowCountIndex::default_index(); +} + +template class BaseIndex; +template class BaseIndex; +template class BaseIndex; +template class BaseIndex; +} \ No newline at end of file diff --git a/cpp/arcticdb/stream/index.hpp b/cpp/arcticdb/stream/index.hpp index ddd881f04a..377e52e1c9 100644 --- a/cpp/arcticdb/stream/index.hpp +++ b/cpp/arcticdb/stream/index.hpp @@ -9,51 +9,32 @@ #include #include -#include #include #include -#include #include -#include -#include -#include + +namespace arcticdb { + class SegmentInMemory; +} namespace arcticdb::stream { using namespace arcticdb::entity; -inline IndexDescriptor::Type get_index_value_type(const AtomKey &key) { - return std::holds_alternative(key.start_index()) ? IndexDescriptor::TIMESTAMP : IndexDescriptor::STRING; -} +IndexDescriptor::Type get_index_value_type(const AtomKey& key); -template +template class BaseIndex { - public: - template - StreamDescriptor create_stream_descriptor(StreamId stream_id, RangeType&& fields) const { +public: + template StreamDescriptor create_stream_descriptor(StreamId stream_id, RangeType&& fields) const { return stream_descriptor(stream_id, *derived(), std::move(fields)); } - [[nodiscard]] StreamDescriptor create_stream_descriptor( - StreamId stream_id, - std::initializer_list fields) const { - std::vector fds{fields}; - return create_stream_descriptor(stream_id, folly::range(fds)); - - } - - [[nodiscard]] const Derived* derived() const { - return static_cast(this); - } - - explicit operator IndexDescriptor() const { - return {Derived::field_count(), Derived::type()}; - } - - [[nodiscard]] FieldRef field(size_t) const { - return {static_cast(typename Derived::TypeDescTag{}), std::string_view(derived()->name())}; - } + [[nodiscard]] StreamDescriptor create_stream_descriptor(StreamId stream_id, std::initializer_list fields) const; + [[nodiscard]] const Derived* derived() const; + explicit operator IndexDescriptor() const; + [[nodiscard]] FieldRef field(size_t) const; }; //TODO make this into just a numeric index, of which timestamp is a special case @@ -61,14 +42,6 @@ class TimeseriesIndex : public BaseIndex { public: static constexpr const char* DefaultName = "time" ; - explicit TimeseriesIndex(const std::string& name) : - name_(name) { - } - - static TimeseriesIndex default_index() { - return TimeseriesIndex(DefaultName); - } - using TypeDescTag = TypeDescriptorTag< DataTypeTag, DimensionTag>; @@ -80,63 +53,19 @@ class TimeseriesIndex : public BaseIndex { static constexpr IndexDescriptor::Type type() { return IndexDescriptor::TIMESTAMP; } + TimeseriesIndex(const std::string& name); + static TimeseriesIndex default_index(); + void check(const FieldCollection& fields) const; + static IndexValue start_value_for_segment(const SegmentInMemory& segment); + static IndexValue end_value_for_segment(const SegmentInMemory& segment); + static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment); + static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment); - void check(const FieldCollection &fields) const { - const size_t fields_size = fields.size(); - constexpr int current_fields_size = int(field_count()); - - const TypeDescriptor &first_field_type = fields[0].type(); - const TypeDescriptor ¤t_first_field_type = this->field(0).type(); - - const bool valid_type_promotion = has_valid_type_promotion(first_field_type, current_first_field_type).has_value(); - const bool trivial_type_compatibility = trivially_compatible_types(first_field_type, current_first_field_type); - - const bool compatible_types = valid_type_promotion || trivial_type_compatibility; - - util::check_arg(fields_size >= current_fields_size, "expected at least {} fields, actual {}", - current_fields_size, fields_size); - util::check_arg(compatible_types, "expected field[0]={}, actual {}", - this->field(0), fields[0]); - } - - template - static IndexValue start_value_for_segment(const SegmentType &segment) { - if (segment.row_count() == 0) - return { NumericIndex{0} }; - auto first_ts = segment.template scalar_at(0, 0).value(); - return {first_ts}; - } - - template - static IndexValue end_value_for_segment(const SegmentType &segment) { - auto row_count = segment.row_count(); - if (row_count == 0) - return { NumericIndex{0} }; - auto last_ts = segment.template scalar_at(row_count - 1, 0).value(); - return {last_ts}; - } - - template - static IndexValue start_value_for_keys_segment(const SegmentType &segment) { - if (segment.row_count() == 0) - return { NumericIndex{0} }; - auto start_index_id = int(pipelines::index::Fields::start_index); - auto first_ts = segment.template scalar_at(0, start_index_id).value(); - return {first_ts}; - } - - template - static IndexValue end_value_for_keys_segment(const SegmentType &segment) { - auto row_count = segment.row_count(); - if (row_count == 0) - return { NumericIndex{0} }; - auto end_index_id = int(pipelines::index::Fields::end_index); - auto last_ts = segment.template scalar_at(row_count - 1, end_index_id).value(); - return {last_ts}; - } + [[nodiscard]] const char* name() const; + static TimeseriesIndex make_from_descriptor(const StreamDescriptor& desc); - template - void set(RowCellSetter setter, const IndexValue &index_value) { + template + void set(RowCellSetter setter, const IndexValue& index_value) { if (std::holds_alternative(index_value)) { auto ts = std::get(index_value); util::check_arg(ts >= ts_, "timestamp decreasing, current val={}, candidate={}", ts_, ts); @@ -146,15 +75,6 @@ class TimeseriesIndex : public BaseIndex { util::raise_rte("Cannot set this type, expecting timestamp"); } - [[nodiscard]] const char *name() const { return name_.c_str(); } - - static TimeseriesIndex make_from_descriptor(const StreamDescriptor& desc) { - if(desc.field_count() > 0) - return TimeseriesIndex(std::string(desc.fields(0).name())); - - return TimeseriesIndex(DefaultName); - } - private: std::string name_; timestamp ts_ = 0; @@ -164,13 +84,9 @@ class TableIndex : public BaseIndex { public: static constexpr const char* DefaultName = "Key"; - explicit TableIndex(const std::string& name) : - name_(name) { - } + explicit TableIndex(const std::string& name); - static TableIndex default_index() { - return TableIndex(DefaultName); - } + static TableIndex default_index(); using TypeDescTag = TypeDescriptorTag< DataTypeTag, @@ -184,45 +100,15 @@ class TableIndex : public BaseIndex { return IndexDescriptor::STRING; } - void check(const FieldCollection &fields) const { - util::check_arg(fields.size() >= int(field_count()), "expected at least {} fields, actual {}", - field_count(), fields.size()); + void check(const FieldCollection& fields) const; - util::check(fields.ref_at(0) == field(0), - "Field descriptor mismatch {} != {}", fields.ref_at(0), field(0)); - } + static IndexValue start_value_for_segment(const SegmentInMemory& segment); - template - static IndexValue start_value_for_segment(const SegmentType &segment) { - auto string_index = segment.string_at(0, 0).value(); - return {std::string{string_index}}; - } + static IndexValue end_value_for_segment(const SegmentInMemory& segment); - template - static IndexValue end_value_for_segment(const SegmentType &segment) { - auto last_rowid = segment.row_count() - 1; - auto string_index = segment.string_at(last_rowid, 0).value(); - return {std::string{string_index}}; - } + static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment); - template - static IndexValue start_value_for_keys_segment(const SegmentType &segment) { - if (segment.row_count() == 0) - return { NumericIndex{0} }; - auto start_index_id = int(pipelines::index::Fields::start_index); - auto string_index = segment.string_at(0, start_index_id).value(); - return {std::string{string_index}}; - } - - template - static IndexValue end_value_for_keys_segment(const SegmentType &segment) { - auto row_count = segment.row_count(); - if (row_count == 0) - return { NumericIndex{0} }; - auto end_index_id = int(pipelines::index::Fields::end_index); - auto string_index = segment.string_at(row_count - 1, end_index_id).value(); - return {std::string{string_index}}; - } + static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment); template void set(RowCellSetter setter, const IndexValue &index_value) const { @@ -232,14 +118,9 @@ class TableIndex : public BaseIndex { util::raise_rte("Cannot set this type. Expecting std::string"); } - static TableIndex make_from_descriptor(const StreamDescriptor& desc) { - if(desc.field_count() > 0) - return TableIndex(std::string(desc.field(0).name())); - - return TableIndex(DefaultName); - } + static TableIndex make_from_descriptor(const StreamDescriptor& desc); - const char *name() const { return name_.c_str(); } + const char* name() const; private: std::string name_; @@ -253,42 +134,26 @@ class RowCountIndex : public BaseIndex { RowCountIndex() = default; - static RowCountIndex default_index() { - return RowCountIndex{}; - } + static RowCountIndex default_index(); static constexpr size_t field_count() { return 0; } static constexpr IndexDescriptor::Type type() { return IndexDescriptor::ROWCOUNT; } - template - static IndexValue start_value_for_segment(const SegmentType &segment) { - return static_cast(segment.offset()); - } + static IndexValue start_value_for_segment(const SegmentInMemory& segment); - template - static IndexValue end_value_for_segment(const SegmentType &segment) { - return static_cast(segment.offset() + (segment.row_count() - 1)); - } + static IndexValue end_value_for_segment(const SegmentInMemory& segment); - template - static IndexValue start_value_for_keys_segment(const SegmentType &segment) { - return static_cast(segment.offset()); - } + static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment); - template - static IndexValue end_value_for_keys_segment(const SegmentType &segment) { - return static_cast(segment.offset() + (segment.row_count() - 1)); - } + static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment); template void set(RowCellSetter, const IndexValue & = {timestamp(0)}) { // No index value } - RowCountIndex make_from_descriptor(const StreamDescriptor&) const { - return RowCountIndex::default_index(); - } + RowCountIndex make_from_descriptor(const StreamDescriptor&) const; static constexpr const char *name() { return "row_count"; } }; @@ -312,80 +177,22 @@ class EmptyIndex : public BaseIndex { return {}; } - [[nodiscard]] static IndexValue start_value_for_segment(const SegmentInMemory& segment) { - return static_cast(segment.offset()); - } - - [[nodiscard]] static IndexValue end_value_for_segment(const SegmentInMemory& segment) { - return static_cast(segment.offset()); - } - - [[nodiscard]] static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment) { - return static_cast(segment.offset()); - } - - [[nodiscard]] static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment) { - return static_cast(segment.offset()); - } + [[nodiscard]] static IndexValue start_value_for_segment(const SegmentInMemory& segment); + [[nodiscard]] static IndexValue end_value_for_segment(const SegmentInMemory& segment); + [[nodiscard]] static IndexValue start_value_for_keys_segment(const SegmentInMemory& segment); + [[nodiscard]] static IndexValue end_value_for_keys_segment(const SegmentInMemory& segment); }; using Index = std::variant; -inline Index index_type_from_descriptor(const StreamDescriptor &desc) { - switch (desc.index().proto().kind()) { - case IndexDescriptor::EMPTY: return EmptyIndex{}; - case IndexDescriptor::TIMESTAMP: - return TimeseriesIndex::make_from_descriptor(desc); - case IndexDescriptor::STRING: - return TableIndex::make_from_descriptor(desc); - case IndexDescriptor::ROWCOUNT: - return RowCountIndex{}; - default:util::raise_rte("Data obtained from storage refers to an index type that this build of ArcticDB doesn't understand ({}).", int(desc.index().proto().kind())); - } -} - -inline Index default_index_type_from_descriptor(const IndexDescriptor::Proto &desc) { - switch (desc.kind()) { - case IndexDescriptor::EMPTY: return EmptyIndex{}; - case IndexDescriptor::TIMESTAMP: - return TimeseriesIndex::default_index(); - case IndexDescriptor::STRING: - return TableIndex::default_index(); - case IndexDescriptor::ROWCOUNT: - return RowCountIndex::default_index(); - default: - util::raise_rte("Unknown index type {} trying to generate index type", int(desc.kind())); - } -} +Index index_type_from_descriptor(const StreamDescriptor& desc); +Index default_index_type_from_descriptor(const IndexDescriptor::Proto& desc); // Only to be used for visitation to get field count etc as the name is not set -inline Index variant_index_from_type(IndexDescriptor::Type type) { - switch (type) { - case IndexDescriptor::EMPTY: return EmptyIndex{}; - case IndexDescriptor::TIMESTAMP: - return TimeseriesIndex{TimeseriesIndex::DefaultName}; - case IndexDescriptor::STRING: - return TableIndex{TableIndex::DefaultName}; - case IndexDescriptor::ROWCOUNT: - return RowCountIndex{}; - default: - util::raise_rte("Unknown index type {} trying to generate index type", int(type)); - } -} - -inline Index default_index_type_from_descriptor(const IndexDescriptor &desc) { - return default_index_type_from_descriptor(desc.proto()); -} - -inline IndexDescriptor get_descriptor_from_index(const Index& index) { - return util::variant_match(index, [] (const auto& idx) { - return static_cast(idx); - }); -} - -inline Index empty_index() { - return RowCountIndex::default_index(); -} +Index variant_index_from_type(IndexDescriptor::Type type); +Index default_index_type_from_descriptor(const IndexDescriptor& desc); +IndexDescriptor get_descriptor_from_index(const Index& index); +Index empty_index(); }