Skip to content

Commit

Permalink
enhance: add valid_data in span (#35030)
Browse files Browse the repository at this point in the history
#31728

Signed-off-by: lixinguo <[email protected]>
Co-authored-by: lixinguo <[email protected]>
  • Loading branch information
smellthemoon and lixinguo authored Aug 2, 2024
1 parent f466129 commit 475c333
Show file tree
Hide file tree
Showing 16 changed files with 258 additions and 47 deletions.
35 changes: 29 additions & 6 deletions internal/core/src/common/Span.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class SpanBase {
int64_t element_sizeof)
: data_(data), row_count_(row_count), element_sizeof_(element_sizeof) {
}
explicit SpanBase(const void* data,
const bool* valid_data,
int64_t row_count,
int64_t element_sizeof)
: data_(data),
valid_data_(valid_data),
row_count_(row_count),
element_sizeof_(element_sizeof) {
}

int64_t
row_count() const {
Expand All @@ -49,8 +58,14 @@ class SpanBase {
return data_;
}

const bool*
valid_data() const {
return valid_data_;
}

private:
const void* data_;
const bool* valid_data_{nullptr};
int64_t row_count_;
int64_t element_sizeof_;
};
Expand All @@ -65,20 +80,22 @@ class Span<T,
std::is_same_v<T, PkType>>> {
public:
using embedded_type = T;
explicit Span(const T* data, int64_t row_count)
: data_(data), row_count_(row_count) {
explicit Span(const T* data, const bool* valid_data, int64_t row_count)
: data_(data), valid_data_(valid_data), row_count_(row_count) {
}

explicit Span(std::string_view data) {
Span(data.data(), data.size());
explicit Span(std::string_view data, bool* valid_data) {
Span(data.data(), valid_data, data.size());
}

operator SpanBase() const {
return SpanBase(data_, row_count_, sizeof(T));
return SpanBase(data_, valid_data_, row_count_, sizeof(T));
}

explicit Span(const SpanBase& base)
: Span(reinterpret_cast<const T*>(base.data()), base.row_count()) {
: Span(reinterpret_cast<const T*>(base.data()),
base.valid_data(),
base.row_count()) {
assert(base.element_sizeof() == sizeof(T));
}

Expand All @@ -92,6 +109,11 @@ class Span<T,
return data_;
}

const bool*
valid_data() const {
return valid_data_;
}

const T&
operator[](int64_t offset) const {
return data_[offset];
Expand All @@ -104,6 +126,7 @@ class Span<T,

private:
const T* data_;
const bool* valid_data_;
const int64_t row_count_;
};

Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/exec/expression/CompareExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
return [chunk_data](int i) -> const number { return chunk_data[i]; };
} else {
auto chunk_data =
segment_->chunk_view<std::string_view>(field_id, chunk_id).data();
segment_->chunk_view<std::string_view>(field_id, chunk_id)
.first.data();
return [chunk_data](int i) -> const number {
return std::string(chunk_data[i]);
};
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ class SegmentExpr : public Expr {

auto& skip_index = segment_->GetSkipIndex();
if (!skip_func || !skip_func(skip_index, field_id_, 0)) {
auto data_vec = segment_->get_batch_views<T>(
field_id_, 0, current_data_chunk_pos_, need_size);
auto data_vec =
segment_
->get_batch_views<T>(
field_id_, 0, current_data_chunk_pos_, need_size)
.first;

func(data_vec.data(), need_size, res, values...);
}
Expand Down
23 changes: 23 additions & 0 deletions internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class ChunkVectorBase {
get_chunk_size(int64_t index) = 0;
virtual Type
get_element(int64_t chunk_id, int64_t chunk_offset) = 0;
virtual int64_t
get_element_size() = 0;
virtual int64_t
get_element_offset(int64_t index) = 0;
virtual ChunkViewType<Type>
view_element(int64_t chunk_id, int64_t chunk_offset) = 0;
int64_t
Expand Down Expand Up @@ -166,6 +170,25 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
vec_.clear();
}

int64_t
get_element_size() override {
std::shared_lock<std::shared_mutex> lck(mutex_);
if constexpr (IsMmap && std::is_same_v<std::string, Type>) {
return sizeof(ChunkViewType<Type>);
}
return sizeof(Type);
}

int64_t
get_element_offset(int64_t index) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
int64_t offset = 0;
for (int i = 0; i < index - 1; i++) {
offset += vec_[i].size();
}
return offset;
}

SpanBase
get_span(int64_t chunk_id) override {
std::shared_lock<std::shared_mutex> lck(mutex_);
Expand Down
24 changes: 16 additions & 8 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class ColumnBase {
SetPaddingSize(data_type);

if (IsVariableDataType(data_type)) {
if (field_meta.is_nullable()) {
nullable_ = true;
valid_data_.reserve(reserve);
}
return;
}

Expand Down Expand Up @@ -214,7 +218,7 @@ class ColumnBase {
ColumnBase(ColumnBase&& column) noexcept
: data_(column.data_),
nullable_(column.nullable_),
valid_data_(column.valid_data_),
valid_data_(std::move(column.valid_data_)),
padding_(column.padding_),
type_size_(column.type_size_),
num_rows_(column.num_rows_),
Expand Down Expand Up @@ -282,7 +286,7 @@ class ColumnBase {
"GetBatchBuffer only supported for VariableColumn");
}

virtual std::vector<std::string_view>
virtual std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews() const {
PanicInfo(ErrorCode::Unsupported,
"StringViews only supported for VariableColumn");
Expand Down Expand Up @@ -519,7 +523,8 @@ class Column : public ColumnBase {

SpanBase
Span() const override {
return SpanBase(data_, num_rows_, data_cap_size_ / num_rows_);
return SpanBase(
data_, valid_data_.data(), num_rows_, data_cap_size_ / num_rows_);
}
};

Expand Down Expand Up @@ -681,7 +686,7 @@ class VariableColumn : public ColumnBase {
"span() interface is not implemented for variable column");
}

std::vector<std::string_view>
std::pair<std::vector<std::string_view>, FixedVector<bool>>
StringViews() const override {
std::vector<std::string_view> res;
char* pos = data_;
Expand All @@ -692,7 +697,7 @@ class VariableColumn : public ColumnBase {
res.emplace_back(std::string_view(pos, size));
pos += size;
}
return res;
return std::make_pair(res, valid_data_);
}

[[nodiscard]] std::vector<ViewType>
Expand Down Expand Up @@ -861,7 +866,10 @@ class ArrayColumn : public ColumnBase {

SpanBase
Span() const override {
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
return SpanBase(views_.data(),
valid_data_.data(),
views_.size(),
sizeof(ArrayView));
}

[[nodiscard]] const std::vector<ArrayView>&
Expand All @@ -885,8 +893,8 @@ class ArrayColumn : public ColumnBase {
element_indices_.emplace_back(array.get_offsets());
if (nullable_) {
return ColumnBase::Append(static_cast<const char*>(array.data()),
array.byte_size(),
valid_data);
valid_data,
array.byte_size());
}
ColumnBase::Append(static_cast<const char*>(array.data()),
array.byte_size());
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/query/groupby/SearchGroupByOperator.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ class SealedDataGetter : public DataGetter<T> {
if constexpr (std::is_same_v<T, std::string>) {
str_field_data_ =
std::make_shared<std::vector<std::string_view>>(
segment.chunk_view<std::string_view>(field_id, 0));
segment.chunk_view<std::string_view>(field_id, 0)
.first);
} else {
auto span = segment.chunk_data<T>(field_id, 0);
field_data_ =
std::make_shared<Span<T>>(span.data(), span.row_count());
field_data_ = std::make_shared<Span<T>>(
span.data(), span.valid_data(), span.row_count());
}
} else if (segment.HasIndex(field_id)) {
this->field_index_ = &(segment.chunk_scalar_index<T>(field_id, 0));
Expand Down
26 changes: 26 additions & 0 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ class VectorBase {
virtual int64_t
get_chunk_size(ssize_t chunk_index) const = 0;

virtual int64_t
get_element_size() const = 0;

virtual int64_t
get_element_offset(ssize_t chunk_index) const = 0;

virtual ssize_t
num_chunk() const = 0;

Expand Down Expand Up @@ -245,6 +251,26 @@ class ConcurrentVectorImpl : public VectorBase {
return chunks_ptr_->get_chunk_size(chunk_index);
}

int64_t
get_element_size() const override {
if constexpr (is_type_entire_row) {
return chunks_ptr_->get_element_size();
} else if constexpr (std::is_same_v<Type, int64_t> || // NOLINT
std::is_same_v<Type, int>) {
// only for testing
PanicInfo(NotImplemented, "unimplemented");
} else {
static_assert(
std::is_same_v<typename TraitType::embedded_type, Type>);
return elements_per_row_;
}
}

int64_t
get_element_offset(ssize_t chunk_index) const override {
return chunks_ptr_->get_element_offset(chunk_index);
}

// just for fun, don't use it directly
const Type*
get_element(ssize_t element_index) const {
Expand Down
29 changes: 28 additions & 1 deletion internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ class ThreadSafeValidData {
return data_[offset];
}

bool*
get_chunk_data(size_t offset) {
std::shared_lock<std::shared_mutex> lck(mutex_);
Assert(offset < length_);
return &data_[offset];
}

private:
mutable std::shared_mutex mutex_{};
FixedVector<bool> data_;
Expand Down Expand Up @@ -770,10 +777,30 @@ struct InsertRecord {
}

bool
is_valid_data_exist(FieldId field_id) {
is_data_exist(FieldId field_id) const {
return data_.find(field_id) != data_.end();
}

bool
is_valid_data_exist(FieldId field_id) const {
return valid_data_.find(field_id) != valid_data_.end();
}

SpanBase
get_span_base(FieldId field_id, int64_t chunk_id) const {
auto data = get_data_base(field_id);
if (is_valid_data_exist(field_id)) {
auto size = data->get_chunk_size(chunk_id);
auto element_offset = data->get_element_offset(chunk_id);
return SpanBase(
data->get_chunk_data(chunk_id),
get_valid_data(field_id)->get_chunk_data(element_offset),
size,
data->get_element_size());
}
return data->get_span_base(chunk_id);
}

// append a column of scalar or sparse float vector type
template <typename Type>
void
Expand Down
5 changes: 2 additions & 3 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,10 @@ SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) {

SpanBase
SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const {
auto vec = get_insert_record().get_data_base(field_id);
return vec->get_span_base(chunk_id);
return get_insert_record().get_span_base(field_id, chunk_id);
}

std::vector<std::string_view>
std::pair<std::vector<std::string_view>, FixedVector<bool>>
SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const {
PanicInfo(ErrorCode::NotImplemented,
"chunk view impl not implement for growing segment");
Expand Down
12 changes: 10 additions & 2 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ class SegmentGrowingImpl : public SegmentGrowing {
return id_;
}

bool
is_nullable(FieldId field_id) const override {
AssertInfo(insert_record_.is_data_exist(field_id),
"Cannot find field_data with field_id: " +
std::to_string(field_id.get()));
return insert_record_.is_valid_data_exist(field_id);
};

public:
const InsertRecord<>&
get_insert_record() const {
Expand Down Expand Up @@ -318,10 +326,10 @@ class SegmentGrowingImpl : public SegmentGrowing {
SpanBase
chunk_data_impl(FieldId field_id, int64_t chunk_id) const override;

std::vector<std::string_view>
std::pair<std::vector<std::string_view>, FixedVector<bool>>
chunk_view_impl(FieldId field_id, int64_t chunk_id) const override;

BufferView
std::pair<BufferView, FixedVector<bool>>
get_chunk_buffer(FieldId field_id,
int64_t chunk_id,
int64_t start_offset,
Expand Down
Loading

0 comments on commit 475c333

Please sign in to comment.