Skip to content

Commit

Permalink
Dev/vasil.pashov/index file refactor (#1472)
Browse files Browse the repository at this point in the history
Refactor index.hpp

Move the functions in a .cpp file. Add explicit template instantiations
for the CRTP base class in the cpp file.

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <[email protected]>
  • Loading branch information
vasil-pashov and Vasil Pashov authored Apr 2, 2024
1 parent a14eb48 commit e6c4f28
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 241 deletions.
1 change: 1 addition & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ set(arcticdb_srcs
storage/storage_factory.cpp
stream/aggregator.cpp
stream/append_map.cpp
stream/index.cpp
stream/piloted_clock.cpp
toolbox/library_tool.cpp
util/allocator.cpp
Expand Down
252 changes: 252 additions & 0 deletions cpp/arcticdb/stream/index.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/* Copyright 2024 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software
* will be governed by the Apache License, version 2.0.
*/

#include <arcticdb/stream/index.hpp>
#include <arcticdb/column_store/memory_segment.hpp>
#include <arcticdb/pipeline/index_fields.hpp>
#include <arcticdb/entity/type_utils.hpp>


namespace arcticdb::stream {

IndexDescriptor::Type get_index_value_type(const AtomKey& key) {
return std::holds_alternative<timestamp>(key.start_index()) ? IndexDescriptor::TIMESTAMP
: IndexDescriptor::STRING;
}

template <typename Derived>
StreamDescriptor BaseIndex<Derived>::create_stream_descriptor(
StreamId stream_id,
std::initializer_list<FieldRef> fields
) const {
std::vector<FieldRef> fds{fields};
return create_stream_descriptor(stream_id, folly::range(fds));
}

template <typename Derived> const Derived* BaseIndex<Derived>::derived() const {
return static_cast<const Derived*>(this);
}

template <typename Derived> BaseIndex<Derived>::operator IndexDescriptor() const {
return {Derived::field_count(), Derived::type()};
}

template <typename Derived> FieldRef BaseIndex<Derived>::field(size_t) const {
return {static_cast<TypeDescriptor>(typename Derived::TypeDescTag{}), std::string_view(derived()->name())};
}

TimeseriesIndex::TimeseriesIndex(const std::string& name) : name_(name) {}

TimeseriesIndex TimeseriesIndex::default_index() {
return TimeseriesIndex(DefaultName);
}

void TimeseriesIndex::check(const FieldCollection& fields) const {
const size_t fields_size = fields.size();
constexpr int current_fields_size = int(field_count());

const TypeDescriptor& first_field_type = fields[0].type();
const TypeDescriptor& current_first_field_type = this->field(0).type();

const bool valid_type_promotion = has_valid_type_promotion(first_field_type, current_first_field_type).has_value();
const bool trivial_type_compatibility = trivially_compatible_types(first_field_type, current_first_field_type);

const bool compatible_types = valid_type_promotion || trivial_type_compatibility;

util::check_arg(
fields_size >= current_fields_size,
"expected at least {} fields, actual {}",
current_fields_size,
fields_size
);
util::check_arg(compatible_types, "expected field[0]={}, actual {}", this->field(0), fields[0]);
}

IndexValue TimeseriesIndex::start_value_for_segment(const SegmentInMemory& segment) {
if (segment.row_count() == 0)
return {NumericIndex{0}};
auto first_ts = segment.template scalar_at<timestamp>(0, 0).value();
return {first_ts};
}

IndexValue TimeseriesIndex::end_value_for_segment(const SegmentInMemory& segment) {
auto row_count = segment.row_count();
if (row_count == 0)
return {NumericIndex{0}};
auto last_ts = segment.template scalar_at<timestamp>(row_count - 1, 0).value();
return {last_ts};
}

IndexValue TimeseriesIndex::start_value_for_keys_segment(const SegmentInMemory& segment) {
if (segment.row_count() == 0)
return {NumericIndex{0}};
auto start_index_id = int(pipelines::index::Fields::start_index);
auto first_ts = segment.template scalar_at<timestamp>(0, start_index_id).value();
return {first_ts};
}

IndexValue TimeseriesIndex::end_value_for_keys_segment(const SegmentInMemory& segment) {
auto row_count = segment.row_count();
if (row_count == 0)
return {NumericIndex{0}};
auto end_index_id = int(pipelines::index::Fields::end_index);
auto last_ts = segment.template scalar_at<timestamp>(row_count - 1, end_index_id).value();
return {last_ts};
}

const char* TimeseriesIndex::name() const {
return name_.c_str();
}

TimeseriesIndex TimeseriesIndex::make_from_descriptor(const StreamDescriptor& desc) {
if (desc.field_count() > 0)
return TimeseriesIndex(std::string(desc.fields(0).name()));

return TimeseriesIndex(DefaultName);
}


TableIndex::TableIndex(const std::string& name) : name_(name) {
}

TableIndex TableIndex::default_index() {
return TableIndex(DefaultName);
}

void TableIndex::check(const FieldCollection& fields) const {
util::check_arg(
fields.size() >= int(field_count()),
"expected at least {} fields, actual {}",
field_count(),
fields.size()
);

util::check(fields.ref_at(0) == field(0), "Field descriptor mismatch {} != {}", fields.ref_at(0), field(0));
}

IndexValue TableIndex::start_value_for_segment(const SegmentInMemory& segment) {
auto string_index = segment.string_at(0, 0).value();
return {std::string{string_index}};
}

IndexValue TableIndex::end_value_for_segment(const SegmentInMemory& segment) {
auto last_rowid = segment.row_count() - 1;
auto string_index = segment.string_at(last_rowid, 0).value();
return {std::string{string_index}};
}

IndexValue TableIndex::start_value_for_keys_segment(const SegmentInMemory& segment) {
if (segment.row_count() == 0)
return {NumericIndex{0}};
auto start_index_id = int(pipelines::index::Fields::start_index);
auto string_index = segment.string_at(0, start_index_id).value();
return {std::string{string_index}};
}

IndexValue TableIndex::end_value_for_keys_segment(const SegmentInMemory& segment) {
auto row_count = segment.row_count();
if (row_count == 0)
return {NumericIndex{0}};
auto end_index_id = int(pipelines::index::Fields::end_index);
auto string_index = segment.string_at(row_count - 1, end_index_id).value();
return {std::string{string_index}};
}

TableIndex TableIndex::make_from_descriptor(const StreamDescriptor& desc) {
if (desc.field_count() > 0)
return TableIndex(std::string(desc.field(0).name()));

return TableIndex(DefaultName);
}

const char* TableIndex::name() const {
return name_.c_str();
}

RowCountIndex RowCountIndex::default_index() {
return RowCountIndex{};
}


IndexValue RowCountIndex::start_value_for_segment(const SegmentInMemory& segment) {
return static_cast<timestamp>(segment.offset());
}

IndexValue RowCountIndex::end_value_for_segment(const SegmentInMemory& segment) {
return static_cast<timestamp>(segment.offset() + (segment.row_count() - 1));
}

IndexValue RowCountIndex::start_value_for_keys_segment(const SegmentInMemory& segment) {
return static_cast<timestamp>(segment.offset());
}

IndexValue RowCountIndex::end_value_for_keys_segment(const SegmentInMemory& segment) {
return static_cast<timestamp>(segment.offset() + (segment.row_count() - 1));
}

RowCountIndex RowCountIndex::make_from_descriptor(const StreamDescriptor&) const {
return RowCountIndex::default_index();
}

IndexValue EmptyIndex::start_value_for_segment(const SegmentInMemory& segment) {
return static_cast<NumericIndex>(segment.offset());
}

IndexValue EmptyIndex::end_value_for_segment(const SegmentInMemory& segment) {
return static_cast<NumericIndex>(segment.offset());
}

IndexValue EmptyIndex::start_value_for_keys_segment(const SegmentInMemory& segment) {
return static_cast<NumericIndex>(segment.offset());
}

IndexValue EmptyIndex::end_value_for_keys_segment(const SegmentInMemory& segment) {
return static_cast<NumericIndex>(segment.offset());
}

Index index_type_from_descriptor(const StreamDescriptor& desc) {
switch (desc.index().proto().kind()) {
case IndexDescriptor::EMPTY: return EmptyIndex{};
case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::make_from_descriptor(desc);
case IndexDescriptor::STRING: return TableIndex::make_from_descriptor(desc);
case IndexDescriptor::ROWCOUNT: return RowCountIndex{};
default:
util::raise_rte(
"Data obtained from storage refers to an index type that this build of ArcticDB doesn't understand ({}).",
int(desc.index().proto().kind())
);
}
}

Index default_index_type_from_descriptor(const IndexDescriptor::Proto& desc) {
switch (desc.kind()) {
case IndexDescriptor::EMPTY: return EmptyIndex{};
case IndexDescriptor::TIMESTAMP: return TimeseriesIndex::default_index();
case IndexDescriptor::STRING: return TableIndex::default_index();
case IndexDescriptor::ROWCOUNT: return RowCountIndex::default_index();
default: util::raise_rte("Unknown index type {} trying to generate index type", int(desc.kind()));
}
}

Index default_index_type_from_descriptor(const IndexDescriptor& desc) {
return default_index_type_from_descriptor(desc.proto());
}

IndexDescriptor get_descriptor_from_index(const Index& index) {
return util::variant_match(index, [](const auto& idx) { return static_cast<IndexDescriptor>(idx); });
}

Index empty_index() {
return RowCountIndex::default_index();
}

template class BaseIndex<TimeseriesIndex>;
template class BaseIndex<TableIndex>;
template class BaseIndex<RowCountIndex>;
template class BaseIndex<EmptyIndex>;
}
Loading

0 comments on commit e6c4f28

Please sign in to comment.