Skip to content

Commit

Permalink
enhance: add chunk basic impl (milvus-io#34634)
Browse files Browse the repository at this point in the history
milvus-io#35112
This pr would not affect milvus functionality by now.
It implments a Chunk memory layout that looks like 
```
VariableColumn
|offset|offset|offset|
|data|data|data|

```
We maybe move offsets to the beginning and add null bitmaps later but
not in this PR.
And mmap test will also be added in another PR.

---------

Signed-off-by: sunby <[email protected]>
  • Loading branch information
sunby authored Aug 1, 2024
1 parent e9d61da commit f229f24
Show file tree
Hide file tree
Showing 11 changed files with 1,250 additions and 0 deletions.
3 changes: 3 additions & 0 deletions internal/core/src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ set(COMMON_SRC
EasyAssert.cpp
FieldData.cpp
RegexQuery.cpp
ChunkTarget.cpp
Chunk.cpp
ChunkWriter.cpp
)

add_library(milvus_common SHARED ${COMMON_SRC})
Expand Down
61 changes: 61 additions & 0 deletions internal/core/src/common/Chunk.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include <sys/mman.h>
#include <cstdint>
#include "common/Array.h"
#include "common/Span.h"
#include "common/Types.h"
#include "common/Chunk.h"

namespace milvus {

std::vector<std::string_view>
StringChunk::StringViews() const {
std::vector<std::string_view> ret;
for (int i = 0; i < row_nums_ - 1; i++) {
ret.emplace_back(data_ + offsets_[i], offsets_[i + 1] - offsets_[i]);
}
ret.emplace_back(data_ + offsets_[row_nums_ - 1],
size_ - MMAP_STRING_PADDING - offsets_[row_nums_ - 1]);
return ret;
}

void
ArrayChunk::ConstructViews() {
views_.reserve(row_nums_);

for (int i = 0; i < row_nums_; ++i) {
auto data_ptr = data_ + offsets_[i];
auto next_data_ptr = i == row_nums_ - 1
? data_ + size_ - MMAP_ARRAY_PADDING
: data_ + offsets_[i + 1];
auto offsets_len = lens_[i] * sizeof(uint64_t);
std::vector<uint64_t> element_indices = {};
if (IsStringDataType(element_type_)) {
std::vector<uint64_t> tmp(
reinterpret_cast<uint64_t*>(data_ptr),
reinterpret_cast<uint64_t*>(data_ptr + offsets_len));
element_indices = std::move(tmp);
}
views_.emplace_back(data_ptr + offsets_len,
next_data_ptr - data_ptr - offsets_len,
element_type_,
std::move(element_indices));
}
}

SpanBase
ArrayChunk::Span() const {
return SpanBase(views_.data(), views_.size(), sizeof(ArrayView));
}

} // namespace milvus
148 changes: 148 additions & 0 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#pragma once

#include <cstddef>
#include <cstdint>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>
#include "arrow/array/array_base.h"
#include "arrow/record_batch.h"
#include "common/Array.h"
#include "common/ChunkTarget.h"
#include "common/FieldDataInterface.h"
#include "common/Json.h"
#include "common/Span.h"
#include "knowhere/sparse_utils.h"
#include "simdjson/common_defs.h"
#include "sys/mman.h"
namespace milvus {
constexpr size_t MMAP_STRING_PADDING = 1;
constexpr size_t MMAP_ARRAY_PADDING = 1;
class Chunk {
public:
Chunk() = default;
Chunk(int64_t row_nums, char* data, size_t size)
: row_nums_(row_nums), data_(data), size_(size) {
}
virtual ~Chunk() {
munmap(data_, size_);
}

protected:
char* data_;
int64_t row_nums_;
size_t size_;
};

// for fixed size data, includes fixed size array
template <typename T>
class FixedWidthChunk : public Chunk {
public:
FixedWidthChunk(int32_t row_nums, int32_t dim, char* data, size_t size)
: Chunk(row_nums, data, size), dim_(dim){};

milvus::SpanBase
Span() const {
auto null_bitmap_bytes_num = (row_nums_ + 7) / 8;
return milvus::SpanBase(
data_ + null_bitmap_bytes_num, row_nums_, sizeof(T) * dim_);
}

private:
int dim_;
};

class StringChunk : public Chunk {
public:
StringChunk() = default;
StringChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
}

std::vector<std::string_view>
StringViews() const;

protected:
uint64_t* offsets_;
};

using JSONChunk = StringChunk;

class ArrayChunk : public Chunk {
public:
ArrayChunk(int32_t row_nums,
char* data,
size_t size,
milvus::DataType element_type)
: Chunk(row_nums, data, size), element_type_(element_type) {
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
offsets_ = reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
lens_ = offsets_ + row_nums;
ConstructViews();
}

SpanBase
Span() const;

void
ConstructViews();

private:
milvus::DataType element_type_;
uint64_t* offsets_;
uint64_t* lens_;
std::vector<ArrayView> views_;
};

class SparseFloatVectorChunk : public Chunk {
public:
SparseFloatVectorChunk(int32_t row_nums, char* data, size_t size)
: Chunk(row_nums, data, size) {
vec_.resize(row_nums);
auto null_bitmap_bytes_num = (row_nums + 7) / 8;
auto offsets_ptr =
reinterpret_cast<uint64_t*>(data + null_bitmap_bytes_num);
for (int i = 0; i < row_nums; i++) {
int vec_size = 0;
if (i == row_nums - 1) {
vec_size = size - offsets_ptr[i];
} else {
vec_size = offsets_ptr[i + 1] - offsets_ptr[i];
}

vec_[i] = {
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data + offsets_ptr[i]),
false};
}
}

const char*
Data() const {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}

// only for test
std::vector<knowhere::sparse::SparseRow<float>>&
Vec() {
return vec_;
}

private:
std::vector<knowhere::sparse::SparseRow<float>> vec_;
};
} // namespace milvus
36 changes: 36 additions & 0 deletions internal/core/src/common/ChunkFileWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License


#include "common/ChunkFileWriter.h"
#include "common/ChunkWriter.h"
namespace milvus {
ChunkFileWriter::ChunkFileWriter(std::string& file_path)
: file_(File::Open(file_path, O_RDWR | O_CREAT | O_TRUNC)) {
}

void
ChunkFileWriter::write_chunk(std::shared_ptr<arrow::RecordBatchReader> r) {
// FIXME
size_t file_offset = file_.Seek(0, SEEK_END);
auto chunk = create_chunk(field_meta_, dim_, file_, file_offset, r);
// TODO: stat_writer_.write(chunk);
rep_.chunks.push_back(*chunk);
}

FileRep
ChunkFileWriter::finish() {
// TODO: stat_writer_.finish();
// rep_.stat_chunk = stat_writer_.get();
return rep_;
}

} // namespace milvus
44 changes: 44 additions & 0 deletions internal/core/src/common/ChunkFileWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#pragma once
#include <memory>
#include "arrow/record_batch.h"
#include "arrow/table_builder.h"
#include "common/Chunk.h"
#include "common/ChunkTarget.h"
#include "common/FieldMeta.h"
namespace milvus {
class StatisticsChunkWriter;
class StatisticsChunk;

class ChunkFileWriter {
public:
ChunkFileWriter() = default;
ChunkFileWriter(std::string& file_path);
struct FileRep {
std::vector<Chunk> chunks;
};
void
write_chunk(std::shared_ptr<arrow::RecordBatchReader> r);

FileRep
finish();

private:
FieldMeta& field_meta_;
int dim_;
StatisticsChunkWriter stat_writer_;
File file_;
FileRep rep_;
};

} // namespace milvus
74 changes: 74 additions & 0 deletions internal/core/src/common/ChunkTarget.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License

#include <common/ChunkTarget.h>
#include <cstring>
#include "common/EasyAssert.h"
#include <sys/mman.h>

namespace milvus {
void
MemChunkTarget::write(const void* data, size_t size, bool append) {
AssertInfo(size + size_ <= cap_, "can not exceed target capacity");
std::memcpy(data_ + size_, data, size);
size_ += append ? size : 0;
}

void
MemChunkTarget::skip(size_t size) {
size_ += size;
}

void
MemChunkTarget::seek(size_t offset) {
size_ = offset;
}

std::pair<char*, size_t>
MemChunkTarget::get() {
return {data_, cap_};
}

size_t
MemChunkTarget::tell() {
return size_;
}

void
MmapChunkTarget::write(const void* data, size_t size, bool append) {
auto n = file_.Write(data, size);
AssertInfo(n != -1, "failed to write data to file");
size_ += append ? size : 0;
}

void
MmapChunkTarget::skip(size_t size) {
file_.Seek(size, SEEK_CUR);
size_ += size;
}

void
MmapChunkTarget::seek(size_t offset) {
file_.Seek(offset_ + offset, SEEK_SET);
}

std::pair<char*, size_t>
MmapChunkTarget::get() {
auto m = mmap(
nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_);
return {(char*)m, size_};
}

size_t
MmapChunkTarget::tell() {
return size_;
}
} // namespace milvus
Loading

0 comments on commit f229f24

Please sign in to comment.