diff --git a/internal/core/src/common/Common.h b/internal/core/src/common/Common.h index 784fdfad04675..c398c161d58ea 100644 --- a/internal/core/src/common/Common.h +++ b/internal/core/src/common/Common.h @@ -46,4 +46,12 @@ SetCpuNum(const int core); void SetDefaultExecEvalExprBatchSize(int64_t val); +struct BufferView { + char* data_; + size_t size_; + + BufferView(char* data_ptr, size_t size) : data_(data_ptr), size_(size) { + } +}; + } // namespace milvus diff --git a/internal/core/src/common/EasyAssert.h b/internal/core/src/common/EasyAssert.h index 3b247600b376d..6eb4f46e17f93 100644 --- a/internal/core/src/common/EasyAssert.h +++ b/internal/core/src/common/EasyAssert.h @@ -64,6 +64,7 @@ enum ErrorCode { MemAllocateFailed = 2034, MemAllocateSizeNotMatch = 2035, MmapError = 2036, + OutOfRange = 2037, KnowhereError = 2100, // timeout or cancel related. diff --git a/internal/core/src/common/File.h b/internal/core/src/common/File.h index 801bd50b5e44d..f25f748ac184b 100644 --- a/internal/core/src/common/File.h +++ b/internal/core/src/common/File.h @@ -39,7 +39,7 @@ class File { "failed to create mmap file {}: {}", filepath, strerror(errno)); - return File(fd); + return File(fd, std::string(filepath)); } int @@ -47,11 +47,22 @@ class File { return fd_; } + std::string + Path() const { + return filepath_; + } + ssize_t Write(const void* buf, size_t size) { return write(fd_, buf, size); } + template , int> = 0> + ssize_t + WriteInt(T value) { + return write(fd_, &value, sizeof(value)); + } + offset_t Seek(offset_t offset, int whence) { return lseek(fd_, offset, whence); @@ -64,8 +75,10 @@ class File { } private: - explicit File(int fd) : fd_(fd) { + explicit File(int fd, const std::string& filepath) + : fd_(fd), filepath_(filepath) { } int fd_{-1}; + std::string filepath_; }; } // namespace milvus diff --git a/internal/core/src/exec/expression/CompareExpr.cpp b/internal/core/src/exec/expression/CompareExpr.cpp index 6ec731040b1f1..43dd6c039d4f0 100644 --- a/internal/core/src/exec/expression/CompareExpr.cpp +++ b/internal/core/src/exec/expression/CompareExpr.cpp @@ -77,7 +77,7 @@ PhyCompareFilterExpr::GetChunkData(FieldId field_id, return [chunk_data](int i) -> const number { return chunk_data[i]; }; } else { auto chunk_data = - segment_->chunk_data(field_id, chunk_id).data(); + segment_->chunk_view(field_id, chunk_id).data(); return [chunk_data](int i) -> const number { return std::string(chunk_data[i]); }; diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index 9331ae052317c..6798eeedb4210 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -26,7 +26,7 @@ PhyExistsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { case DataType::JSON: { if (is_index_mode_) { PanicInfo(ExprInvalid, - "exists expr for json index mode not supportted"); + "exists expr for json index mode not supported"); } result = EvalJsonExistsForDataSegment(); break; diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index 684217acca683..84704e8ea6730 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -183,6 +183,32 @@ class SegmentExpr : public Expr { : batch_size_; } + // used for processing raw data expr for sealed segments. + // now only used for std::string_view && json + // TODO: support more types + template + int64_t + ProcessChunkForSealedSeg( + FUNC func, + std::function skip_func, + TargetBitmapView res, + ValTypes... values) { + // For sealed segment, only single chunk + Assert(num_data_chunk_ == 1); + auto need_size = + std::min(active_count_ - current_data_chunk_pos_, batch_size_); + + auto& skip_index = segment_->GetSkipIndex(); + if (!skip_func || !skip_func(skip_index, field_id_, 0)) { + auto data_vec = segment_->get_batch_views( + field_id_, 0, current_data_chunk_pos_, need_size); + + func(data_vec.data(), need_size, res, values...); + } + current_data_chunk_pos_ += need_size; + return need_size; + } + template int64_t ProcessDataChunks( @@ -191,6 +217,15 @@ class SegmentExpr : public Expr { TargetBitmapView res, ValTypes... values) { int64_t processed_size = 0; + + if constexpr (std::is_same_v || + std::is_same_v) { + if (segment_->type() == SegmentType::Sealed) { + return ProcessChunkForSealedSeg( + func, skip_func, res, values...); + } + } + for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; @@ -422,4 +457,4 @@ class ExprSet { }; } //namespace exec -} // namespace milvus \ No newline at end of file +} // namespace milvus diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index bbcc852c2a8e2..da9f3d6aaa895 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -35,7 +35,7 @@ PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { if (is_index_mode_) { PanicInfo( ExprInvalid, - "exists expr for json or array index mode not supportted"); + "exists expr for json or array index mode not supported"); } result = EvalJsonContainsForDataSegment(); break; diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index 627473256fff9..934a2cc114920 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -26,6 +26,7 @@ #include #include "common/Array.h" +#include "common/Common.h" #include "common/EasyAssert.h" #include "common/File.h" #include "common/FieldMeta.h" @@ -52,6 +53,8 @@ namespace milvus { constexpr size_t STRING_PADDING = 1; constexpr size_t ARRAY_PADDING = 1; +constexpr size_t BLOCK_SIZE = 8192; + class ColumnBase { public: enum MappingType { @@ -223,6 +226,19 @@ class ColumnBase { virtual SpanBase Span() const = 0; + // used for sequential access for search + virtual BufferView + GetBatchBuffer(int64_t start_offset, int64_t length) { + PanicInfo(ErrorCode::Unsupported, + "GetBatchBuffer only supported for VariableColumn"); + } + + virtual std::vector + StringViews() const { + PanicInfo(ErrorCode::Unsupported, + "StringViews only supported for VariableColumn"); + } + virtual void AppendBatch(const FieldDataPtr data) { size_t required_size = size_ + data->Size(); @@ -548,40 +564,94 @@ class VariableColumn : public ColumnBase { } VariableColumn(VariableColumn&& column) noexcept - : ColumnBase(std::move(column)), - indices_(std::move(column.indices_)), - views_(std::move(column.views_)) { + : ColumnBase(std::move(column)), indices_(std::move(column.indices_)) { } ~VariableColumn() override = default; SpanBase Span() const override { - return SpanBase(views_.data(), views_.size(), sizeof(ViewType)); + PanicInfo(ErrorCode::NotImplemented, + "span() interface is not implemented for variable column"); + } + + std::vector + StringViews() const override { + std::vector res; + char* pos = data_; + for (size_t i = 0; i < num_rows_; ++i) { + uint32_t size; + size = *reinterpret_cast(pos); + pos += sizeof(uint32_t); + res.emplace_back(std::string_view(pos, size)); + pos += size; + } + return res; } - [[nodiscard]] const std::vector& + [[nodiscard]] std::vector Views() const { - return views_; + std::vector res; + char* pos = data_; + for (size_t i = 0; i < num_rows_; ++i) { + uint32_t size; + size = *reinterpret_cast(pos); + pos += sizeof(uint32_t); + res.emplace_back(ViewType(pos, size)); + pos += size; + } + return res; + } + + BufferView + GetBatchBuffer(int64_t start_offset, int64_t length) override { + if (start_offset < 0 || start_offset > num_rows_ || + start_offset + length > num_rows_) { + PanicInfo(ErrorCode::OutOfRange, "index out of range"); + } + + char* pos = data_ + indices_[start_offset / BLOCK_SIZE]; + for (size_t j = 0; j < start_offset % BLOCK_SIZE; j++) { + uint32_t size; + size = *reinterpret_cast(pos); + pos += sizeof(uint32_t) + size; + } + + return BufferView{pos, size_ - (pos - data_)}; } ViewType operator[](const int i) const { - return views_[i]; + if (i < 0 || i > num_rows_) { + PanicInfo(ErrorCode::OutOfRange, "index out of range"); + } + size_t batch_id = i / BLOCK_SIZE; + size_t offset = i % BLOCK_SIZE; + + // located in batch start location + char* pos = data_ + indices_[batch_id]; + for (size_t j = 0; j < offset; j++) { + uint32_t size; + size = *reinterpret_cast(pos); + pos += sizeof(uint32_t) + size; + } + + uint32_t size; + size = *reinterpret_cast(pos); + return ViewType(pos + sizeof(uint32_t), size); } std::string_view RawAt(const int i) const { - return std::string_view(views_[i]); + return std::string_view((*this)[i]); } void Append(FieldDataPtr chunk) { for (auto i = 0; i < chunk->get_num_rows(); i++) { - auto data = static_cast(chunk->RawValue(i)); - indices_.emplace_back(size_); - size_ += data->size(); + auto data = static_cast(chunk->RawValue(i)); + size_ += sizeof(uint32_t) + data->size(); } load_buf_.emplace(std::move(chunk)); } @@ -604,40 +674,42 @@ class VariableColumn : public ColumnBase { auto chunk = std::move(load_buf_.front()); load_buf_.pop(); + // data_ as: |size|data|size|data...... for (auto i = 0; i < chunk->get_num_rows(); i++) { + auto current_size = (uint32_t)chunk->Size(i); + std::memcpy(data_ + size_, ¤t_size, sizeof(uint32_t)); + size_ += sizeof(uint32_t); auto data = static_cast(chunk->RawValue(i)); - std::copy_n(data->c_str(), data->size(), data_ + size_); + std::memcpy(data_ + size_, data->c_str(), data->size()); size_ += data->size(); } } } - ConstructViews(); - - // Not need indices_ after - indices_.clear(); - std::vector().swap(indices_); + shrink_indice(); } protected: void - ConstructViews() { - views_.reserve(indices_.size()); - for (size_t i = 0; i < indices_.size() - 1; i++) { - views_.emplace_back(data_ + indices_[i], - indices_[i + 1] - indices_[i]); + shrink_indice() { + std::vector tmp_indices; + tmp_indices.reserve((indices_.size() + BLOCK_SIZE - 1) / BLOCK_SIZE); + + for (size_t i = 0; i < indices_.size();) { + tmp_indices.push_back(indices_[i]); + i += BLOCK_SIZE; } - views_.emplace_back(data_ + indices_.back(), size_ - indices_.back()); + + indices_.swap(tmp_indices); } private: // loading states std::queue load_buf_{}; + // raw data index, record indices located 0, interval, 2 * interval, 3 * interval + // ... just like page index, interval set to 8192 that matches search engine's batch size std::vector indices_{}; - - // Compatible with current Span type - std::vector views_{}; }; class ArrayColumn : public ColumnBase { diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index d56d252c81c69..b59f594c491f5 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -33,47 +33,72 @@ namespace milvus { -inline size_t +#define THROW_FILE_WRITE_ERROR \ + PanicInfo(ErrorCode::FileWriteFailed, \ + fmt::format("write data to file {} failed, error code {}", \ + file.Path(), \ + strerror(errno))); + +inline void WriteFieldData(File& file, DataType data_type, const FieldDataPtr& data, + uint64_t& total_written, + std::vector& indices, std::vector>& element_indices) { - size_t total_written{0}; if (IsVariableDataType(data_type)) { switch (data_type) { case DataType::VARCHAR: case DataType::STRING: { + // write as: |size|data|size|data...... for (auto i = 0; i < data->get_num_rows(); ++i) { + indices.push_back(total_written); auto str = static_cast(data->RawValue(i)); - ssize_t written = file.Write(str->data(), str->size()); - if (written < str->size()) { - break; + ssize_t written_data_size = + file.WriteInt(uint32_t(str->size())); + if (written_data_size != sizeof(uint32_t)) { + THROW_FILE_WRITE_ERROR } - total_written += written; + total_written += written_data_size; + auto written_data = file.Write(str->data(), str->size()); + if (written_data < str->size()) { + THROW_FILE_WRITE_ERROR + } + total_written += written_data; } break; } case DataType::JSON: { + // write as: |size|data|size|data...... for (ssize_t i = 0; i < data->get_num_rows(); ++i) { + indices.push_back(total_written); auto padded_string = static_cast(data->RawValue(i))->data(); - ssize_t written = + ssize_t written_data_size = + file.WriteInt(uint32_t(padded_string.size())); + if (written_data_size != sizeof(uint32_t)) { + THROW_FILE_WRITE_ERROR + } + total_written += written_data_size; + ssize_t written_data = file.Write(padded_string.data(), padded_string.size()); - if (written < padded_string.size()) { - break; + if (written_data < padded_string.size()) { + THROW_FILE_WRITE_ERROR } - total_written += written; + total_written += written_data; } break; } case DataType::ARRAY: { + // write as: |data|data|data|data|data...... for (size_t i = 0; i < data->get_num_rows(); ++i) { + indices.push_back(total_written); auto array = static_cast(data->RawValue(i)); ssize_t written = file.Write(array->data(), array->byte_size()); if (written < array->byte_size()) { - break; + THROW_FILE_WRITE_ERROR } element_indices.emplace_back(array->get_offsets()); total_written += written; @@ -100,9 +125,15 @@ WriteFieldData(File& file, GetDataTypeName(data_type)); } } else { - total_written += file.Write(data->Data(), data->Size()); + // write as: data|data|data|data|data|data...... + size_t written = file.Write(data->Data(), data->Size()); + if (written < data->Size()) { + THROW_FILE_WRITE_ERROR + } + for (auto i = 0; i < data->get_num_rows(); i++) { + indices.emplace_back(total_written); + total_written += data->Size(i); + } } - - return total_written; } } // namespace milvus diff --git a/internal/core/src/query/GroupByOperator.h b/internal/core/src/query/GroupByOperator.h index 21162c09bfe9b..1cd0ca34a68fd 100644 --- a/internal/core/src/query/GroupByOperator.h +++ b/internal/core/src/query/GroupByOperator.h @@ -58,7 +58,7 @@ template class SealedDataGetter : public DataGetter { private: std::shared_ptr> field_data_; - std::shared_ptr> str_field_data_; + std::shared_ptr> str_field_data_; const index::ScalarIndex* field_index_; public: @@ -66,9 +66,9 @@ class SealedDataGetter : public DataGetter { FieldId& field_id) { if (segment.HasFieldData(field_id)) { if constexpr (std::is_same_v) { - auto span = segment.chunk_data(field_id, 0); - str_field_data_ = std::make_shared>( - span.data(), span.row_count()); + str_field_data_ = + std::make_shared>( + segment.chunk_view(field_id, 0)); } else { auto span = segment.chunk_data(field_id, 0); field_data_ = diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 2e3ae50fe6514..842fb2c6a1e84 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -421,6 +421,12 @@ SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { return vec->get_span_base(chunk_id); } +std::vector +SegmentGrowingImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const { + PanicInfo(ErrorCode::NotImplemented, + "chunk view impl not implement for growing segment"); +} + int64_t SegmentGrowingImpl::num_chunk() const { auto size = get_insert_record().ack_responder_.GetAck(); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index ce62566b08a73..80ad30df8c2ad 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -299,6 +299,11 @@ class SegmentGrowingImpl : public SegmentGrowing { limit, bitset, false_filtered_out); } + bool + is_mmap_field(FieldId id) const override { + return false; + } + protected: int64_t num_chunk() const override; @@ -306,6 +311,19 @@ class SegmentGrowingImpl : public SegmentGrowing { SpanBase chunk_data_impl(FieldId field_id, int64_t chunk_id) const override; + std::vector + chunk_view_impl(FieldId field_id, int64_t chunk_id) const override; + + BufferView + get_chunk_buffer(FieldId field_id, + int64_t chunk_id, + int64_t start_offset, + int64_t length) const override { + PanicInfo( + ErrorCode::Unsupported, + "get_chunk_buffer interface not supported for growing segment"); + } + void check_search(const query::Plan* plan) const override { Assert(plan); diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 663cfa20819be..0330d4305d97c 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -138,6 +138,46 @@ class SegmentInternalInterface : public SegmentInterface { return static_cast>(chunk_data_impl(field_id, chunk_id)); } + template + std::vector + chunk_view(FieldId field_id, int64_t chunk_id) const { + auto string_views = chunk_view_impl(field_id, chunk_id); + if constexpr (std::is_same_v) { + return std::move(string_views); + } else { + std::vector res; + res.reserve(string_views.size()); + for (const auto& view : string_views) { + res.emplace_back(view); + } + return res; + } + } + + template + std::vector + get_batch_views(FieldId field_id, + int64_t chunk_id, + int64_t start_offset, + int64_t length) const { + if (this->type() == SegmentType::Growing) { + PanicInfo(ErrorCode::Unsupported, + "get chunk views not supported for growing segment"); + } + BufferView buffer = + get_chunk_buffer(field_id, chunk_id, start_offset, length); + std::vector res; + res.reserve(length); + char* pos = buffer.data_; + for (size_t j = 0; j < length; j++) { + uint32_t size; + size = *reinterpret_cast(pos); + pos += sizeof(uint32_t); + res.emplace_back(ViewType(pos, size)); + pos += size; + } + } + template const index::ScalarIndex& chunk_scalar_index(FieldId field_id, int64_t chunk_id) const { @@ -306,11 +346,26 @@ class SegmentInternalInterface : public SegmentInterface { bool ignore_non_pk, bool fill_ids) const; + // return whether field mmap or not + virtual bool + is_mmap_field(FieldId field_id) const = 0; + protected: // internal API: return chunk_data in span virtual SpanBase chunk_data_impl(FieldId field_id, int64_t chunk_id) const = 0; + // internal API: return chunk string views in vector + virtual std::vector + chunk_view_impl(FieldId field_id, int64_t chunk_id) const = 0; + + // internal API: return buffer reference to field chunk data located from start_offset + virtual BufferView + get_chunk_buffer(FieldId field_id, + int64_t chunk_id, + int64_t start_offset, + int64_t length) const = 0; + // internal API: return chunk_index in span, support scalar index only virtual const index::IndexBase* chunk_index_impl(FieldId field_id, int64_t chunk_id) const = 0; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 6137d6f567581..985235109ffed 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -526,27 +526,17 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { auto data_type = field_meta.get_data_type(); // write the field data to disk + FieldDataPtr field_data; + uint64_t total_written = 0; std::vector indices{}; std::vector> element_indices{}; - FieldDataPtr field_data; - size_t total_written = 0; while (data.channel->pop(field_data)) { - auto written = - WriteFieldData(file, data_type, field_data, element_indices); - - AssertInfo(written == field_data->Size(), - fmt::format("failed to write data file {}, written {} but " - "total {}, err: {}", - filepath.c_str(), - written, - field_data->Size(), - strerror(errno))); - - for (auto i = 0; i < field_data->get_num_rows(); i++) { - auto size = field_data->Size(i); - indices.emplace_back(total_written); - total_written += size; - } + WriteFieldData(file, + data_type, + field_data, + total_written, + indices, + element_indices); } auto num_rows = data.row_count; @@ -596,6 +586,7 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { { std::unique_lock lck(mutex_); fields_.emplace(field_id, column); + mmap_fields_.insert(field_id); } auto ok = unlink(filepath.c_str()); @@ -667,6 +658,29 @@ SegmentSealedImpl::size_per_chunk() const { return get_row_count(); } +BufferView +SegmentSealedImpl::get_chunk_buffer(FieldId field_id, + int64_t chunk_id, + int64_t start_offset, + int64_t length) const { + std::shared_lock lck(mutex_); + AssertInfo(get_bit(field_data_ready_bitset_, field_id), + "Can't get bitset element at " + std::to_string(field_id.get())); + auto& field_meta = schema_->operator[](field_id); + if (auto it = fields_.find(field_id); it != fields_.end()) { + auto& field_data = it->second; + return field_data->GetBatchBuffer(start_offset, length); + } + PanicInfo(ErrorCode::UnexpectedError, + "get_chunk_buffer only used for variable column field"); +} + +bool +SegmentSealedImpl::is_mmap_field(FieldId field_id) const { + std::shared_lock lck(mutex_); + return mmap_fields_.find(field_id) != mmap_fields_.end(); +} + SpanBase SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { std::shared_lock lck(mutex_); @@ -683,6 +697,20 @@ SegmentSealedImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { return field_data->get_span_base(0); } +std::vector +SegmentSealedImpl::chunk_view_impl(FieldId field_id, int64_t chunk_id) const { + std::shared_lock lck(mutex_); + AssertInfo(get_bit(field_data_ready_bitset_, field_id), + "Can't get bitset element at " + std::to_string(field_id.get())); + auto& field_meta = schema_->operator[](field_id); + if (auto it = fields_.find(field_id); it != fields_.end()) { + auto& field_data = it->second; + return field_data->StringViews(); + } + PanicInfo(ErrorCode::UnexpectedError, + "chunk_view_impl only used for variable column field "); +} + const index::IndexBase* SegmentSealedImpl::chunk_index_impl(FieldId field_id, int64_t chunk_id) const { AssertInfo(scalar_indexings_.find(field_id) != scalar_indexings_.end(), diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 2f79645337623..8880b57c1061d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -149,6 +149,9 @@ class SegmentSealedImpl : public SegmentSealed { const int64_t* seg_offsets, int64_t count) const override; + bool + is_mmap_field(FieldId id) const override; + void ClearData(); @@ -157,6 +160,15 @@ class SegmentSealedImpl : public SegmentSealed { SpanBase chunk_data_impl(FieldId field_id, int64_t chunk_id) const override; + std::vector + chunk_view_impl(FieldId field_id, int64_t chunk_id) const override; + + BufferView + get_chunk_buffer(FieldId field_id, + int64_t chunk_id, + int64_t start_offset, + int64_t length) const override; + const index::IndexBase* chunk_index_impl(FieldId field_id, int64_t chunk_id) const override; @@ -307,6 +319,7 @@ class SegmentSealedImpl : public SegmentSealed { SchemaPtr schema_; int64_t id_; std::unordered_map> fields_; + std::unordered_set mmap_fields_; // only useful in binlog IndexMetaPtr col_index_meta_; diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index d8f7fc29b34e4..ca39ba8b8e39a 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -4171,7 +4171,7 @@ TEST(CApiTest, SealedSegment_Update_Field_Size) { int64_t total_size = 0; for (int i = 0; i < N; ++i) { auto str = "string_data_" + std::to_string(i); - total_size += str.size(); + total_size += str.size() + sizeof(uint32_t); str_datas.emplace_back(str); } auto res = LoadFieldRawData(segment, str_fid.get(), str_datas.data(), N); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 8032d6e2cb5d2..b022cbd993dca 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -498,7 +498,8 @@ TEST(Sealed, LoadFieldData) { ASSERT_EQ(segment->num_chunk_index(str_id), 0); auto chunk_span1 = segment->chunk_data(counter_id, 0); auto chunk_span2 = segment->chunk_data(double_id, 0); - auto chunk_span3 = segment->chunk_data(str_id, 0); + auto chunk_span3 = + segment->get_batch_views(str_id, 0, 0, N); auto ref1 = dataset.get_col(counter_id); auto ref2 = dataset.get_col(double_id); auto ref3 = dataset.get_col(str_id)->scalars().string_data().data(); @@ -624,7 +625,8 @@ TEST(Sealed, ClearData) { ASSERT_EQ(segment->num_chunk_index(str_id), 0); auto chunk_span1 = segment->chunk_data(counter_id, 0); auto chunk_span2 = segment->chunk_data(double_id, 0); - auto chunk_span3 = segment->chunk_data(str_id, 0); + auto chunk_span3 = + segment->get_batch_views(str_id, 0, 0, N); auto ref1 = dataset.get_col(counter_id); auto ref2 = dataset.get_col(double_id); auto ref3 = dataset.get_col(str_id)->scalars().string_data().data(); @@ -726,7 +728,8 @@ TEST(Sealed, LoadFieldDataMmap) { ASSERT_EQ(segment->num_chunk_index(str_id), 0); auto chunk_span1 = segment->chunk_data(counter_id, 0); auto chunk_span2 = segment->chunk_data(double_id, 0); - auto chunk_span3 = segment->chunk_data(str_id, 0); + auto chunk_span3 = + segment->get_batch_views(str_id, 0, 0, N); auto ref1 = dataset.get_col(counter_id); auto ref2 = dataset.get_col(double_id); auto ref3 = dataset.get_col(str_id)->scalars().string_data().data();