Skip to content

Commit

Permalink
Implement empty index for 0-rowed columns (#1429)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
Closes: #1428 

#### What does this implement or fix?
Create an empty-index type. This required change in the Python and in
the C++ layer.
* In the C++ layer an new index type was added.
(IndexDescriptor::EMPTY). It does not allocate a filed in the storage
(similar to how row range index does not allocate a field). The checks
for index compatibility are relaxed, the empty index is compatible with
all other index types and it gets overridden the first time a non-empty
index is written (either through update or append). On write we check if
the dataframe contains 0 rows and if so it gets assigned an empty index.
* The logic in the python layer is dodgy and needs discussion. In the
current state the normalization metadata and the index descriptor are
stored separately. There is one proto message describing both DateTime
index and Ranged Index. The current change made it so that in case of 0
rows the python layer passes RowRange index to the C++ layer which
checks if there are any rows in the DF. If there are rows Row range
index is used, otherwise empty index is used. Note the
`is_not_range_index` proto field. IMO it needs some refactoring in
further PRs. It's used in the python layer to check if the first column
is index or not.
#### Any other comments?

Merge this after: #1436

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <[email protected]>
  • Loading branch information
vasil-pashov and Vasil Pashov committed Apr 24, 2024
1 parent 504636f commit aa3389d
Show file tree
Hide file tree
Showing 25 changed files with 453 additions and 322 deletions.
9 changes: 7 additions & 2 deletions cpp/arcticdb/entity/merge_descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,13 @@ StreamDescriptor merge_descriptors(
// Merge all the fields for all slices, apart from the index which we already have from the first descriptor.
// Note that we preserve the ordering as we see columns, especially the index which needs to be column 0.
for (const auto &fields : entries) {
if(has_index)
util::variant_match(index, [&fields] (const auto& idx) { idx.check(*fields); });
if (has_index) {
util::variant_match(index,
[](const EmptyIndex&) {},
[](const RowCountIndex&) {},
[&fields] (const auto& idx) { idx.check(*fields); }
);
}

for (size_t idx = has_index ? 1u : 0u; idx < static_cast<size_t>(fields->size()); ++idx) {
const auto& field = fields->at(idx);
Expand Down
3 changes: 1 addition & 2 deletions cpp/arcticdb/entity/stream_descriptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ struct StreamDescriptor {

std::shared_ptr<Proto> data_ = std::make_shared<Proto>();
std::shared_ptr<FieldCollection> fields_ = std::make_shared<FieldCollection>();
;

StreamDescriptor() = default;
~StreamDescriptor() = default;
Expand Down Expand Up @@ -65,7 +64,7 @@ struct StreamDescriptor {
data_->set_sorted(sorted_value_to_proto(sorted));
}

SortedValue get_sorted() {
SortedValue get_sorted() const {
return sorted_value_from_proto(data_->sorted());
}

Expand Down
65 changes: 65 additions & 0 deletions cpp/arcticdb/entity/types_proto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,69 @@ namespace arcticdb::entity {
}, id);
}

IndexDescriptor::IndexDescriptor(size_t field_count, Type type) {
data_.set_kind(type);
data_.set_field_count(static_cast<uint32_t>(field_count));
}

IndexDescriptor::IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data)
: data_(std::move(data)) {
}

bool IndexDescriptor::uninitialized() const {
return data_.field_count() == 0 && data_.kind() == Type::IndexDescriptor_Type_UNKNOWN;
}

const IndexDescriptor::Proto& IndexDescriptor::proto() const {
return data_;
}

size_t IndexDescriptor::field_count() const {
return static_cast<size_t>(data_.field_count());
}

IndexDescriptor::Type IndexDescriptor::type() const {
return data_.kind();
}

void IndexDescriptor::set_type(Type type) {
data_.set_kind(type);
}

bool operator==(const IndexDescriptor& left, const IndexDescriptor& right) {
return left.type() == right.type();
}

IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type) {
switch (type) {
case IndexDescriptor::EMPTY: return 'E';
case IndexDescriptor::TIMESTAMP: return 'T';
case IndexDescriptor::ROWCOUNT: return 'R';
case IndexDescriptor::STRING: return 'S';
case IndexDescriptor::UNKNOWN: return 'U';
default: util::raise_rte("Unknown index type: {}", int(type));
}
}

IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type) {
switch (type) {
case 'E': return IndexDescriptor::EMPTY;
case 'T': return IndexDescriptor::TIMESTAMP;
case 'R': return IndexDescriptor::ROWCOUNT;
case 'S': return IndexDescriptor::STRING;
case 'U': return IndexDescriptor::UNKNOWN;
default: util::raise_rte("Unknown index type: {}", int(type));
}
}

const char* index_type_to_str(IndexDescriptor::Type type) {
switch (type) {
case IndexDescriptor::EMPTY: return "Empty";
case IndexDescriptor::TIMESTAMP: return "Timestamp";
case IndexDescriptor::ROWCOUNT: return "Row count";
case IndexDescriptor::STRING: return "String";
case IndexDescriptor::UNKNOWN: return "Unknown";
default: util::raise_rte("Unknown index type: {}", int(type));
}
}
} // namespace arcticdb
72 changes: 16 additions & 56 deletions cpp/arcticdb/entity/types_proto.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,69 +49,29 @@ namespace arcticdb::entity {
Proto data_;
using Type = arcticdb::proto::descriptors::IndexDescriptor::Type;

static const Type UNKNOWN = arcticdb::proto::descriptors::IndexDescriptor_Type_UNKNOWN;
static const Type ROWCOUNT = arcticdb::proto::descriptors::IndexDescriptor_Type_ROWCOUNT;
static const Type STRING = arcticdb::proto::descriptors::IndexDescriptor_Type_STRING;
static const Type TIMESTAMP = arcticdb::proto::descriptors::IndexDescriptor_Type_TIMESTAMP;
static constexpr Type UNKNOWN = arcticdb::proto::descriptors::IndexDescriptor_Type_UNKNOWN;
static constexpr Type EMPTY = arcticdb::proto::descriptors::IndexDescriptor_Type_EMPTY;
static constexpr Type ROWCOUNT = arcticdb::proto::descriptors::IndexDescriptor_Type_ROWCOUNT;
static constexpr Type STRING = arcticdb::proto::descriptors::IndexDescriptor_Type_STRING;
static constexpr Type TIMESTAMP = arcticdb::proto::descriptors::IndexDescriptor_Type_TIMESTAMP;

using TypeChar = char;

IndexDescriptor() = default;
IndexDescriptor(size_t field_count, Type type) {
data_.set_kind(type);
data_.set_field_count(static_cast<uint32_t>(field_count));
}

explicit IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data)
: data_(std::move(data)) {
}

bool uninitialized() const {
return data_.field_count() == 0 && data_.kind() == Type::IndexDescriptor_Type_UNKNOWN;
}

const Proto& proto() const {
return data_;
}

size_t field_count() const {
return static_cast<size_t>(data_.field_count());
}

Type type() const {
return data_.kind();
}

void set_type(Type type) {
data_.set_kind(type);
}

ARCTICDB_MOVE_COPY_DEFAULT(IndexDescriptor)

friend bool operator==(const IndexDescriptor& left, const IndexDescriptor& right) {
return left.type() == right.type();
}
IndexDescriptor(size_t field_count, Type type);
explicit IndexDescriptor(arcticdb::proto::descriptors::IndexDescriptor data);
bool uninitialized() const;
const Proto& proto() const;
size_t field_count() const;
Type type() const;
void set_type(Type type);
friend bool operator==(const IndexDescriptor& left, const IndexDescriptor& right);
};

constexpr IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type) {
switch (type) {
case IndexDescriptor::TIMESTAMP:return 'T';
case IndexDescriptor::ROWCOUNT:return 'R';
case IndexDescriptor::STRING:return 'S';
case IndexDescriptor::UNKNOWN:return 'U';
default:util::raise_rte("Unknown index type: {}", int(type));
}
}

constexpr IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type) {
switch (type) {
case 'T': return IndexDescriptor::TIMESTAMP;
case 'R': return IndexDescriptor::ROWCOUNT;
case 'S': return IndexDescriptor::STRING;
case 'U': return IndexDescriptor::UNKNOWN;
default:util::raise_rte("Unknown index type: {}", int(type));
}
}
IndexDescriptor::TypeChar to_type_char(IndexDescriptor::Type type);
IndexDescriptor::Type from_type_char(IndexDescriptor::TypeChar type);
const char* index_type_to_str(IndexDescriptor::Type type);

void set_id(arcticdb::proto::descriptors::StreamDescriptor& pb_desc, StreamId id);

Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/frame_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ std::pair<size_t, size_t> offset_and_row_count(const std::shared_ptr<pipelines::
return std::make_pair(offset, row_count);
}

bool index_is_not_timeseries_or_is_sorted_ascending(const std::shared_ptr<pipelines::InputTensorFrame>& frame) {
return !std::holds_alternative<stream::TimeseriesIndex>(frame->index) || frame->desc.get_sorted() == SortedValue::ASCENDING;
bool index_is_not_timeseries_or_is_sorted_ascending(const pipelines::InputTensorFrame& frame) {
return !std::holds_alternative<stream::TimeseriesIndex>(frame.index) || frame.desc.get_sorted() == SortedValue::ASCENDING;
}

}
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/frame_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,6 @@ size_t get_slice_rowcounts(
std::pair<size_t, size_t> offset_and_row_count(
const std::shared_ptr<pipelines::PipelineContext>& context);

bool index_is_not_timeseries_or_is_sorted_ascending(const std::shared_ptr<pipelines::InputTensorFrame>& frame);
bool index_is_not_timeseries_or_is_sorted_ascending(const pipelines::InputTensorFrame& frame);

} //namespace arcticdb
2 changes: 1 addition & 1 deletion cpp/arcticdb/pipeline/index_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace arcticdb::pipelines::index {
// TODO: change the name - something like KeysSegmentWriter or KeyAggragator or better
template<class Index, std::enable_if_t<InputTensorFrame::is_valid_index_v<Index>, bool> = 0>
template<ValidIndex Index>
class IndexWriter {
// All index segments are row-count indexed in the sense that the keys are
// already ordered - they don't need an additional index
Expand Down
18 changes: 12 additions & 6 deletions cpp/arcticdb/pipeline/input_tensor_frame.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ namespace arcticdb::pipelines {

using namespace arcticdb::entity;

struct InputTensorFrame {
/// @TODO Move to a separate "util" header
template <typename T, typename... U>
concept is_any_of = (std::same_as<T, U> || ...);

template <typename IndexT>
concept ValidIndex = is_any_of<
std::remove_cvref_t<std::remove_pointer_t<std::decay_t<IndexT>>>,
stream::TimeseriesIndex,
stream::RowCountIndex,
stream::TableIndex,
stream::EmptyIndex>;

template<class T>
static constexpr bool is_valid_index_v =
std::is_same_v<T, stream::TimeseriesIndex> ||
std::is_same_v<T, stream::RowCountIndex> ||
std::is_same_v<T, stream::TableIndex>;

struct InputTensorFrame {
InputTensorFrame() :
index(stream::empty_index()) {}

Expand Down
Loading

0 comments on commit aa3389d

Please sign in to comment.