diff --git a/internal/core/src/common/CMakeLists.txt b/internal/core/src/common/CMakeLists.txt index 4330b43f8099f..53eb53787ac17 100644 --- a/internal/core/src/common/CMakeLists.txt +++ b/internal/core/src/common/CMakeLists.txt @@ -24,6 +24,9 @@ set(COMMON_SRC EasyAssert.cpp FieldData.cpp RegexQuery.cpp + ChunkTarget.cpp + Chunk.cpp + ChunkWriter.cpp ) add_library(milvus_common SHARED ${COMMON_SRC}) diff --git a/internal/core/src/common/Chunk.cpp b/internal/core/src/common/Chunk.cpp new file mode 100644 index 0000000000000..8e957afd18748 --- /dev/null +++ b/internal/core/src/common/Chunk.cpp @@ -0,0 +1,61 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include "common/Array.h" +#include "common/Span.h" +#include "common/Types.h" +#include "common/Chunk.h" + +namespace milvus { + +std::vector +StringChunk::StringViews() const { + std::vector ret; + for (int i = 0; i < row_nums_ - 1; i++) { + ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]); + } + ret.emplace_back(data_ + offsets_[row_nums_ - 1], + size_ - MMAP_STRING_PADDING - offsets_[row_nums_ - 1]); + return ret; +} + +void +ArrayChunk::ConstructViews() { + views_.reserve(row_nums_); + + for (int i = 0; i < row_nums_; ++i) { + auto data_ptr = data_ + offsets_[i]; + auto next_data_ptr = i == row_nums_ - 1 + ? data_ + size_ - MMAP_ARRAY_PADDING + : data_ + offsets_[i + 1]; + auto offsets_len = lens_[i] * sizeof(uint64_t); + std::vector element_indices = {}; + if (IsStringDataType(element_type_)) { + std::vector tmp( + reinterpret_cast(data_ptr), + reinterpret_cast(data_ptr + offsets_len)); + element_indices = std::move(tmp); + } + views_.emplace_back(data_ptr + offsets_len, + next_data_ptr - data_ptr - offsets_len, + element_type_, + std::move(element_indices)); + } +} + +SpanBase +ArrayChunk::Span() const { + return SpanBase(views_.data(), views_.size(), sizeof(ArrayView)); +} + +} // namespace milvus diff --git a/internal/core/src/common/Chunk.h b/internal/core/src/common/Chunk.h new file mode 100644 index 0000000000000..facc0cd4c0408 --- /dev/null +++ b/internal/core/src/common/Chunk.h @@ -0,0 +1,148 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once + +#include +#include +#include +#include +#include +#include +#include "arrow/array/array_base.h" +#include "arrow/record_batch.h" +#include "common/Array.h" +#include "common/ChunkTarget.h" +#include "common/FieldDataInterface.h" +#include "common/Json.h" +#include "common/Span.h" +#include "knowhere/sparse_utils.h" +#include "simdjson/common_defs.h" +#include "sys/mman.h" +namespace milvus { +constexpr size_t MMAP_STRING_PADDING = 1; +constexpr size_t MMAP_ARRAY_PADDING = 1; +class Chunk { + public: + Chunk() = default; + Chunk(int64_t row_nums, char* data, size_t size) + : row_nums_(row_nums), data_(data), size_(size) { + } + virtual ~Chunk() { + munmap(data_, size_); + } + + protected: + char* data_; + int64_t row_nums_; + size_t size_; +}; + +// for fixed size data, includes fixed size array +template +class FixedWidthChunk : public Chunk { + public: + FixedWidthChunk(int32_t row_nums, int32_t dim, char* data, size_t size) + : Chunk(row_nums, data, size), dim_(dim){}; + + milvus::SpanBase + Span() const { + auto null_bitmap_bytes_num = (row_nums_ + 7) / 8; + return milvus::SpanBase( + data_ + null_bitmap_bytes_num, row_nums_, sizeof(T) * dim_); + } + + private: + int dim_; +}; + +class StringChunk : public Chunk { + public: + StringChunk() = default; + StringChunk(int32_t row_nums, char* data, size_t size) + : Chunk(row_nums, data, size) { + auto null_bitmap_bytes_num = (row_nums + 7) / 8; + offsets_ = reinterpret_cast(data + null_bitmap_bytes_num); + } + + std::vector + StringViews() const; + + protected: + uint64_t* offsets_; +}; + +using JSONChunk = StringChunk; + +class ArrayChunk : public Chunk { + public: + ArrayChunk(int32_t row_nums, + char* data, + size_t size, + milvus::DataType element_type) + : Chunk(row_nums, data, size), element_type_(element_type) { + auto null_bitmap_bytes_num = (row_nums + 7) / 8; + offsets_ = reinterpret_cast(data + null_bitmap_bytes_num); + lens_ = offsets_ + row_nums; + ConstructViews(); + } + + SpanBase + Span() const; + + void + ConstructViews(); + + private: + milvus::DataType element_type_; + uint64_t* offsets_; + uint64_t* lens_; + std::vector views_; +}; + +class SparseFloatVectorChunk : public Chunk { + public: + SparseFloatVectorChunk(int32_t row_nums, char* data, size_t size) + : Chunk(row_nums, data, size) { + vec_.resize(row_nums); + auto null_bitmap_bytes_num = (row_nums + 7) / 8; + auto offsets_ptr = + reinterpret_cast(data + null_bitmap_bytes_num); + for (int i = 0; i < row_nums; i++) { + int vec_size = 0; + if (i == row_nums - 1) { + vec_size = size - offsets_ptr[i]; + } else { + vec_size = offsets_ptr[i + 1] - offsets_ptr[i]; + } + + vec_[i] = { + vec_size / knowhere::sparse::SparseRow::element_size(), + (uint8_t*)(data + offsets_ptr[i]), + false}; + } + } + + const char* + Data() const { + return static_cast(static_cast(vec_.data())); + } + + // only for test + std::vector>& + Vec() { + return vec_; + } + + private: + std::vector> vec_; +}; +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkFileWriter.cpp b/internal/core/src/common/ChunkFileWriter.cpp new file mode 100644 index 0000000000000..7503f5c147607 --- /dev/null +++ b/internal/core/src/common/ChunkFileWriter.cpp @@ -0,0 +1,36 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + + +#include "common/ChunkFileWriter.h" +#include "common/ChunkWriter.h" +namespace milvus { +ChunkFileWriter::ChunkFileWriter(std::string& file_path) + : file_(File::Open(file_path, O_RDWR | O_CREAT | O_TRUNC)) { +} + +void +ChunkFileWriter::write_chunk(std::shared_ptr r) { + // FIXME + size_t file_offset = file_.Seek(0, SEEK_END); + auto chunk = create_chunk(field_meta_, dim_, file_, file_offset, r); + // TODO: stat_writer_.write(chunk); + rep_.chunks.push_back(*chunk); +} + +FileRep +ChunkFileWriter::finish() { + // TODO: stat_writer_.finish(); + // rep_.stat_chunk = stat_writer_.get(); + return rep_; +} + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkFileWriter.h b/internal/core/src/common/ChunkFileWriter.h new file mode 100644 index 0000000000000..028fb32e2022b --- /dev/null +++ b/internal/core/src/common/ChunkFileWriter.h @@ -0,0 +1,44 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include "arrow/record_batch.h" +#include "arrow/table_builder.h" +#include "common/Chunk.h" +#include "common/ChunkTarget.h" +#include "common/FieldMeta.h" +namespace milvus { +class StatisticsChunkWriter; +class StatisticsChunk; + +class ChunkFileWriter { + public: + ChunkFileWriter() = default; + ChunkFileWriter(std::string& file_path); + struct FileRep { + std::vector chunks; + }; + void + write_chunk(std::shared_ptr r); + + FileRep + finish(); + + private: + FieldMeta& field_meta_; + int dim_; + StatisticsChunkWriter stat_writer_; + File file_; + FileRep rep_; +}; + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkTarget.cpp b/internal/core/src/common/ChunkTarget.cpp new file mode 100644 index 0000000000000..abe47dd819f8d --- /dev/null +++ b/internal/core/src/common/ChunkTarget.cpp @@ -0,0 +1,74 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include "common/EasyAssert.h" +#include + +namespace milvus { +void +MemChunkTarget::write(const void* data, size_t size, bool append) { + AssertInfo(size + size_ <= cap_, "can not exceed target capacity"); + std::memcpy(data_ + size_, data, size); + size_ += append ? size : 0; +} + +void +MemChunkTarget::skip(size_t size) { + size_ += size; +} + +void +MemChunkTarget::seek(size_t offset) { + size_ = offset; +} + +std::pair +MemChunkTarget::get() { + return {data_, cap_}; +} + +size_t +MemChunkTarget::tell() { + return size_; +} + +void +MmapChunkTarget::write(const void* data, size_t size, bool append) { + auto n = file_.Write(data, size); + AssertInfo(n != -1, "failed to write data to file"); + size_ += append ? size : 0; +} + +void +MmapChunkTarget::skip(size_t size) { + file_.Seek(size, SEEK_CUR); + size_ += size; +} + +void +MmapChunkTarget::seek(size_t offset) { + file_.Seek(offset_ + offset, SEEK_SET); +} + +std::pair +MmapChunkTarget::get() { + auto m = mmap( + nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_); + return {(char*)m, size_}; +} + +size_t +MmapChunkTarget::tell() { + return size_; +} +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkTarget.h b/internal/core/src/common/ChunkTarget.h new file mode 100644 index 0000000000000..3419e40cb202a --- /dev/null +++ b/internal/core/src/common/ChunkTarget.h @@ -0,0 +1,96 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include +#include +#include +#include "common/File.h" +namespace milvus { +class ChunkTarget { + public: + virtual void + write(const void* data, size_t size, bool append = true) = 0; + + virtual void + skip(size_t size) = 0; + + virtual void + seek(size_t offset) = 0; + + virtual std::pair + get() = 0; + + virtual ~ChunkTarget() = default; + + virtual size_t + tell() = 0; +}; + +class MmapChunkTarget : public ChunkTarget { + public: + MmapChunkTarget(File& file, size_t offset) : file_(file), offset_(offset) { + } + void + write(const void* data, size_t size, bool append = true) override; + + void + skip(size_t size) override; + + void + seek(size_t offset) override; + + std::pair + get() override; + + size_t + tell() override; + + private: + File& file_; + size_t offset_ = 0; + size_t size_ = 0; +}; + +class MemChunkTarget : public ChunkTarget { + public: + MemChunkTarget(size_t cap) : cap_(cap) { + data_ = reinterpret_cast(mmap(nullptr, + cap, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, + -1, + 0)); + } + + void + write(const void* data, size_t size, bool append = true) override; + + void + skip(size_t size) override; + + void + seek(size_t offset) override; + + std::pair + get() override; + + size_t + tell() override; + + private: + char* data_; // no need to delete in destructor, will be deleted by Chunk + size_t cap_; + size_t size_ = 0; +}; + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkWriter.cpp b/internal/core/src/common/ChunkWriter.cpp new file mode 100644 index 0000000000000..52b339feb2a23 --- /dev/null +++ b/internal/core/src/common/ChunkWriter.cpp @@ -0,0 +1,362 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "common/ChunkWriter.h" +#include +#include +#include +#include +#include "arrow/array/array_binary.h" +#include "arrow/array/array_primitive.h" +#include "arrow/record_batch.h" +#include "common/Chunk.h" +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" +#include "common/Types.h" +#include "common/VectorTrait.h" +#include "simdjson/common_defs.h" +#include "simdjson/padded_string.h" +namespace milvus { + +void +StringChunkWriter::write(std::shared_ptr data) { + auto size = 0; + std::vector strs; + std::vector> null_bitmaps; + for (auto batch : *data) { + auto data = batch.ValueOrDie()->column(0); + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + strs.push_back(str); + size += str.size(); + } + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + row_nums_ += array->length(); + } + size += sizeof(uint64_t) * row_nums_ + MMAP_STRING_PADDING; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn, padding + // write null bitmaps + for (auto [data, size] : null_bitmaps) { + if (data == nullptr) { + std::vector null_bitmap(size, 0xff); + target_->write(null_bitmap.data(), size); + } else { + target_->write(data, size); + } + } + + // write data + offsets_pos_ = target_->tell(); + target_->skip(sizeof(uint64_t) * row_nums_); + + for (auto str : strs) { + offsets_.push_back(target_->tell()); + target_->write(str.data(), str.size()); + } +} + +std::shared_ptr +StringChunkWriter::finish() { + // write padding, maybe not needed anymore + // FIXME + char padding[MMAP_STRING_PADDING]; + target_->write(padding, MMAP_STRING_PADDING); + + // seek back to write offsets + target_->seek(offsets_pos_); + target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t)); + auto [data, size] = target_->get(); + return std::make_shared(row_nums_, data, size); +} + +void +JSONChunkWriter::write(std::shared_ptr data) { + auto size = 0; + + std::vector jsons; + std::vector> null_bitmaps; + for (auto batch : *data) { + auto data = batch.ValueOrDie()->column(0); + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + auto json = Json(simdjson::padded_string(str)); + size += json.data().size(); + jsons.push_back(std::move(json)); + } + AssertInfo(data->length() % 8 == 0, + "String length should be multiple of 8"); + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + row_nums_ += array->length(); + } + size += sizeof(uint64_t) * row_nums_ + simdjson::SIMDJSON_PADDING; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + // chunk layout: null bitmaps, offset1, offset2, ... ,json1, json2, ..., jsonn + // write null bitmaps + for (auto [data, size] : null_bitmaps) { + if (data == nullptr) { + std::vector null_bitmap(size, 0xff); + target_->write(null_bitmap.data(), size); + } else { + target_->write(data, size); + } + } + + offsets_pos_ = target_->tell(); + target_->skip(sizeof(uint64_t) * row_nums_); + + // write data + for (auto json : jsons) { + offsets_.push_back(target_->tell()); + target_->write(json.data().data(), json.data().size()); + } +} + +std::shared_ptr +JSONChunkWriter::finish() { + char padding[simdjson::SIMDJSON_PADDING]; + target_->write(padding, simdjson::SIMDJSON_PADDING); + + // write offsets and padding + target_->seek(offsets_pos_); + target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t)); + auto [data, size] = target_->get(); + return std::make_shared(row_nums_, data, size); +} + +void +ArrayChunkWriter::write(std::shared_ptr data) { + auto size = 0; + + std::vector arrays; + std::vector> null_bitmaps; + for (auto batch : *data) { + auto data = batch.ValueOrDie()->column(0); + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + ScalarArray scalar_array; + scalar_array.ParseFromArray(str.data(), str.size()); + auto arr = Array(scalar_array); + size += arr.byte_size(); + arrays.push_back(std::move(arr)); + // element offsets size + size += sizeof(uint64_t) * arr.length(); + } + row_nums_ += array->length(); + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + } + + auto is_string = IsStringDataType(element_type_); + // offsets + lens + size += is_string ? sizeof(uint64_t) * row_nums_ * 2 + MMAP_ARRAY_PADDING + : sizeof(uint64_t) * row_nums_ + MMAP_ARRAY_PADDING; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + // chunk layout: nullbitmaps, offsets, elem_off1, elem_off2, .. data1, data2, ..., datan, padding + for (auto [data, size] : null_bitmaps) { + if (data == nullptr) { + std::vector null_bitmap(size, 0xff); + target_->write(null_bitmap.data(), size); + } else { + target_->write(data, size); + } + } + + offsets_pos_ = target_->tell(); + target_->skip(sizeof(uint64_t) * row_nums_ * 2); + for (auto& arr : arrays) { + // write elements offsets + offsets_.push_back(target_->tell()); + if (is_string) { + target_->write(arr.get_offsets().data(), + arr.get_offsets().size() * sizeof(uint64_t)); + } + lens_.push_back(arr.length()); + target_->write(arr.data(), arr.byte_size()); + } +} + +std::shared_ptr +ArrayChunkWriter::finish() { + char padding[MMAP_ARRAY_PADDING]; + target_->write(padding, MMAP_ARRAY_PADDING); + + // write offsets and lens + target_->seek(offsets_pos_); + for (size_t i = 0; i < offsets_.size(); i++) { + target_->write(&offsets_[i], sizeof(uint64_t)); + target_->write(&lens_[i], sizeof(uint64_t)); + } + auto [data, size] = target_->get(); + return std::make_shared(row_nums_, data, size, element_type_); +} + +void +SparseFloatVectorChunkWriter::write( + std::shared_ptr data) { + auto size = 0; + std::vector strs; + std::vector> null_bitmaps; + for (auto batch : *data) { + auto data = batch.ValueOrDie()->column(0); + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto str = array->GetView(i); + strs.push_back(str); + size += str.size(); + } + auto null_bitmap_n = (data->length() + 7) / 8; + null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n); + size += null_bitmap_n; + row_nums_ += array->length(); + } + size += sizeof(uint64_t) * row_nums_; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + // chunk layout: null bitmap, offset1, offset2, ..., offsetn, str1, str2, ..., strn + // write null bitmaps + for (auto [data, size] : null_bitmaps) { + if (data == nullptr) { + std::vector null_bitmap(size, 0xff); + target_->write(null_bitmap.data(), size); + } else { + target_->write(data, size); + } + } + + // write data + offsets_pos_ = target_->tell(); + target_->skip(sizeof(uint64_t) * row_nums_); + + for (auto str : strs) { + offsets_.push_back(target_->tell()); + target_->write(str.data(), str.size()); + } +} + +std::shared_ptr +SparseFloatVectorChunkWriter::finish() { + // seek back to write offsets + target_->seek(offsets_pos_); + target_->write(offsets_.data(), offsets_.size() * sizeof(uint64_t)); + auto [data, size] = target_->get(); + return std::make_shared(row_nums_, data, size); +} + +std::shared_ptr +create_chunk(const FieldMeta& field_meta, + int dim, + std::shared_ptr r) { + std::shared_ptr w; + + switch (field_meta.get_data_type()) { + case milvus::DataType::BOOL: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::INT8: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::INT16: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::INT32: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::INT64: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::FLOAT: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::DOUBLE: { + w = std::make_shared>(dim); + break; + } + case milvus::DataType::VECTOR_FLOAT: { + w = std::make_shared< + ChunkWriter>(dim); + break; + } + case milvus::DataType::VECTOR_BINARY: { + w = std::make_shared< + ChunkWriter>(dim / 8); + break; + } + case milvus::DataType::VECTOR_FLOAT16: { + w = std::make_shared< + ChunkWriter>(dim); + break; + } + case milvus::DataType::VECTOR_BFLOAT16: { + w = std::make_shared< + ChunkWriter>(dim); + break; + } + case milvus::DataType::VARCHAR: + case milvus::DataType::STRING: { + w = std::make_shared(); + break; + } + case milvus::DataType::JSON: { + w = std::make_shared(); + break; + } + case milvus::DataType::ARRAY: { + w = std::make_shared( + field_meta.get_element_type()); + break; + } + case milvus::DataType::VECTOR_SPARSE_FLOAT: { + w = std::make_shared(); + break; + } + default: + PanicInfo(Unsupported, "Unsupported data type"); + } + + w->write(r); + return w->finish(); +} + +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/common/ChunkWriter.h b/internal/core/src/common/ChunkWriter.h new file mode 100644 index 0000000000000..a16b9bae47448 --- /dev/null +++ b/internal/core/src/common/ChunkWriter.h @@ -0,0 +1,239 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include +#include +#include +#include +#include +#include "arrow/array/array_primitive.h" +#include "common/ChunkTarget.h" +#include "arrow/record_batch.h" +#include "common/Chunk.h" +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" +namespace milvus { + +class ChunkWriterBase { + public: + ChunkWriterBase() = default; + + ChunkWriterBase(File& file, size_t offset) + : file_(&file), file_offset_(offset) { + } + + virtual void + write(std::shared_ptr data) = 0; + + virtual std::shared_ptr + finish() = 0; + + std::pair + get_data() { + return target_->get(); + } + + protected: + int row_nums_ = 0; + File* file_ = nullptr; + size_t file_offset_ = 0; + std::shared_ptr target_; +}; + +template +class ChunkWriter : public ChunkWriterBase { + public: + ChunkWriter(int dim) : dim_(dim) { + } + + ChunkWriter(int dim, File& file, size_t offset) + : ChunkWriterBase(file, offset), dim_(dim){}; + + void + write(std::shared_ptr data) override { + auto size = 0; + auto row_nums = 0; + + auto batch_vec = data->ToRecordBatches().ValueOrDie(); + + for (auto batch : batch_vec) { + row_nums += batch->num_rows(); + auto data = batch->column(0); + auto array = std::dynamic_pointer_cast(data); + auto null_bitmap_n = (data->length() + 7) / 8; + size += null_bitmap_n + array->length() * dim_ * sizeof(T); + } + + row_nums_ = row_nums; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + + // chunk layout: nullbitmap, data1, data2, ..., datan + for (auto batch : batch_vec) { + auto data = batch->column(0); + auto null_bitmap = data->null_bitmap_data(); + auto null_bitmap_n = (data->length() + 7) / 8; + if (null_bitmap) { + target_->write(null_bitmap, null_bitmap_n); + } else { + std::vector null_bitmap(null_bitmap_n, 0xff); + target_->write(null_bitmap.data(), null_bitmap_n); + } + } + + for (auto batch : batch_vec) { + auto data = batch->column(0); + auto array = std::dynamic_pointer_cast(data); + auto data_ptr = array->raw_values(); + target_->write(data_ptr, array->length() * dim_ * sizeof(T)); + } + } + + std::shared_ptr + finish() override { + auto [data, size] = target_->get(); + return std::make_shared>( + row_nums_, dim_, data, size); + } + + private: + int dim_; +}; + +template <> +inline void +ChunkWriter::write( + std::shared_ptr data) { + auto size = 0; + auto row_nums = 0; + auto batch_vec = data->ToRecordBatches().ValueOrDie(); + + for (auto batch : batch_vec) { + row_nums += batch->num_rows(); + auto data = batch->column(0); + auto array = std::dynamic_pointer_cast(data); + size += array->length() * dim_; + size += (data->length() + 7) / 8; + } + row_nums_ = row_nums; + if (file_) { + target_ = std::make_shared(*file_, file_offset_); + } else { + target_ = std::make_shared(size); + } + // chunk layout: nullbitmap, data1, data2, ..., datan + for (auto batch : batch_vec) { + auto data = batch->column(0); + auto null_bitmap = data->null_bitmap_data(); + auto null_bitmap_n = (data->length() + 7) / 8; + if (null_bitmap) { + target_->write(null_bitmap, null_bitmap_n); + } else { + std::vector null_bitmap(null_bitmap_n, 0xff); + target_->write(null_bitmap.data(), null_bitmap_n); + } + } + + for (auto batch : batch_vec) { + auto data = batch->column(0); + auto array = std::dynamic_pointer_cast(data); + for (int i = 0; i < array->length(); i++) { + auto value = array->Value(i); + target_->write(&value, sizeof(bool)); + } + } +} + +class StringChunkWriter : public ChunkWriterBase { + public: + using ChunkWriterBase::ChunkWriterBase; + + void + write(std::shared_ptr data) override; + + std::shared_ptr + finish() override; + + protected: + std::vector offsets_; + size_t offsets_pos_ = 0; +}; + +class JSONChunkWriter : public ChunkWriterBase { + public: + using ChunkWriterBase::ChunkWriterBase; + + void + write(std::shared_ptr data) override; + + std::shared_ptr + finish() override; + + private: + std::vector offsets_; + size_t offsets_pos_ = 0; +}; + +class ArrayChunkWriter : public ChunkWriterBase { + public: + ArrayChunkWriter(const milvus::DataType element_type) + : element_type_(element_type) { + } + ArrayChunkWriter(const milvus::DataType element_type, + File& file, + size_t offset) + : ChunkWriterBase(file, offset), element_type_(element_type) { + } + + void + write(std::shared_ptr data) override; + + std::shared_ptr + finish() override; + + private: + const milvus::DataType element_type_; + std::vector offsets_; + std::vector lens_; + size_t offsets_pos_; +}; + +class SparseFloatVectorChunkWriter : public ChunkWriterBase { + public: + using ChunkWriterBase::ChunkWriterBase; + + void + write(std::shared_ptr data) override; + + std::shared_ptr + finish() override; + + private: + uint64_t offsets_pos_ = 0; + std::vector offsets_; +}; + +std::shared_ptr +create_chunk(const FieldMeta& field_meta, + int dim, + std::shared_ptr r); + +std::shared_ptr +create_chunk(const FieldMeta& field_meta, + int dim, + File& file, + size_t file_offset, + std::shared_ptr r); +} // namespace milvus \ No newline at end of file diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 6bafa4f9f7651..246b844a6f055 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -72,6 +72,7 @@ set(MILVUS_TEST_FILES test_chunk_vector.cpp test_mmap_chunk_manager.cpp test_monitor.cpp + test_chunk.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_chunk.cpp b/internal/core/unittest/test_chunk.cpp new file mode 100644 index 0000000000000..543284d16b1ab --- /dev/null +++ b/internal/core/unittest/test_chunk.cpp @@ -0,0 +1,186 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include +#include +#include +#include + +#include "common/Chunk.h" +#include "common/ChunkWriter.h" +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" +#include "common/FieldMeta.h" +#include "common/Types.h" +#include "storage/Event.h" +#include "storage/Util.h" +#include "test_utils/Constants.h" +#include "test_utils/DataGen.h" +using namespace milvus; + +TEST(chunk, test_int64_field) { + FixedVector data = {1, 2, 3, 4, 5}; + auto field_data = + milvus::storage::CreateFieldData(storage::DataType::INT64); + field_data->FillFieldData(data.data(), data.size()); + storage::InsertEventData event_data; + event_data.field_data = field_data; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + auto s = reader_builder.Open(buffer); + EXPECT_TRUE(s.ok()); + std::unique_ptr arrow_reader; + s = reader_builder.Build(&arrow_reader); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + s = arrow_reader->GetRecordBatchReader(&rb_reader); + EXPECT_TRUE(s.ok()); + + FieldMeta field_meta( + FieldName("a"), milvus::FieldId(1), DataType::INT64, false); + auto chunk = create_chunk(field_meta, 1, rb_reader); + auto span = + std::dynamic_pointer_cast>(chunk)->Span(); + EXPECT_EQ(span.row_count(), data.size()); + for (size_t i = 0; i < data.size(); ++i) { + auto n = *(int64_t*)((char*)span.data() + i * span.element_sizeof()); + EXPECT_EQ(n, data[i]); + } +} + +TEST(chunk, test_variable_field) { + FixedVector data = { + "test1", "test2", "test3", "test4", "test5"}; + auto field_data = + milvus::storage::CreateFieldData(storage::DataType::VARCHAR); + field_data->FillFieldData(data.data(), data.size()); + + storage::InsertEventData event_data; + event_data.field_data = field_data; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + auto s = reader_builder.Open(buffer); + EXPECT_TRUE(s.ok()); + std::unique_ptr arrow_reader; + s = reader_builder.Build(&arrow_reader); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + s = arrow_reader->GetRecordBatchReader(&rb_reader); + EXPECT_TRUE(s.ok()); + + FieldMeta field_meta( + FieldName("a"), milvus::FieldId(1), DataType::STRING, false); + auto chunk = create_chunk(field_meta, 1, rb_reader); + auto views = std::dynamic_pointer_cast(chunk)->StringViews(); + for (size_t i = 0; i < data.size(); ++i) { + EXPECT_EQ(views[i], data[i]); + } +} + +TEST(chunk, test_array) { + milvus::proto::schema::ScalarField field_string_data; + field_string_data.mutable_string_data()->add_data("test_array1"); + field_string_data.mutable_string_data()->add_data("test_array2"); + field_string_data.mutable_string_data()->add_data("test_array3"); + field_string_data.mutable_string_data()->add_data("test_array4"); + field_string_data.mutable_string_data()->add_data("test_array5"); + auto string_array = Array(field_string_data); + FixedVector data = {string_array}; + auto field_data = + milvus::storage::CreateFieldData(storage::DataType::ARRAY); + field_data->FillFieldData(data.data(), data.size()); + storage::InsertEventData event_data; + event_data.field_data = field_data; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + auto s = reader_builder.Open(buffer); + EXPECT_TRUE(s.ok()); + std::unique_ptr arrow_reader; + s = reader_builder.Build(&arrow_reader); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + s = arrow_reader->GetRecordBatchReader(&rb_reader); + EXPECT_TRUE(s.ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::ARRAY, + DataType::STRING, + false); + auto chunk = create_chunk(field_meta, 1, rb_reader); + auto span = std::dynamic_pointer_cast(chunk)->Span(); + EXPECT_EQ(span.row_count(), 1); + auto arr = *(ArrayView*)span.data(); + for (size_t i = 0; i < arr.length(); ++i) { + auto str = arr.get_data(i); + EXPECT_EQ(str, field_string_data.string_data().data(i)); + } +} + +TEST(chunk, test_sparse_float) { + auto n_rows = 100; + auto vecs = milvus::segcore::GenerateRandomSparseFloatVector( + n_rows, kTestSparseDim, kTestSparseVectorDensity); + auto field_data = milvus::storage::CreateFieldData( + storage::DataType::VECTOR_SPARSE_FLOAT, false, kTestSparseDim, n_rows); + field_data->FillFieldData(vecs.get(), n_rows); + + storage::InsertEventData event_data; + event_data.field_data = field_data; + auto ser_data = event_data.Serialize(); + auto buffer = std::make_shared( + ser_data.data() + 2 * sizeof(milvus::Timestamp), + ser_data.size() - 2 * sizeof(milvus::Timestamp)); + + parquet::arrow::FileReaderBuilder reader_builder; + auto s = reader_builder.Open(buffer); + EXPECT_TRUE(s.ok()); + std::unique_ptr arrow_reader; + s = reader_builder.Build(&arrow_reader); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<::arrow::RecordBatchReader> rb_reader; + s = arrow_reader->GetRecordBatchReader(&rb_reader); + EXPECT_TRUE(s.ok()); + + FieldMeta field_meta(FieldName("a"), + milvus::FieldId(1), + DataType::VECTOR_SPARSE_FLOAT, + kTestSparseDim, + "IP", + false); + auto chunk = create_chunk(field_meta, kTestSparseDim, rb_reader); + auto vec = std::dynamic_pointer_cast(chunk)->Vec(); + for (size_t i = 0; i < n_rows; ++i) { + auto v1 = vec[i]; + auto v2 = vecs[i]; + EXPECT_EQ(v1.size(), v2.size()); + for (size_t j = 0; j < v1.size(); ++j) { + EXPECT_EQ(v1[j].val, v2[j].val); + } + } +} \ No newline at end of file