Skip to content

Commit

Permalink
fix:add chunk writer for geometry
Browse files Browse the repository at this point in the history
Signed-off-by: tasty-gumi <[email protected]>
  • Loading branch information
tasty-gumi committed Nov 13, 2024
1 parent 82ceb13 commit 95b17a4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 0 deletions.
2 changes: 2 additions & 0 deletions internal/core/src/common/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/Types.h"
namespace milvus {
constexpr uint64_t MMAP_STRING_PADDING = 1;
constexpr uint64_t MMAP_GEOMETRY_PADDING = 1;
constexpr uint64_t MMAP_ARRAY_PADDING = 1;
class Chunk {
public:
Expand Down Expand Up @@ -185,6 +186,7 @@ class StringChunk : public Chunk {
};

using JSONChunk = StringChunk;
using GeometryChunk = StringChunk;

class ArrayChunk : public Chunk {
public:
Expand Down
72 changes: 72 additions & 0 deletions internal/core/src/common/ChunkWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Geometry.h"
#include "common/Types.h"
#include "common/VectorTrait.h"
#include "simdjson/common_defs.h"
Expand Down Expand Up @@ -156,6 +157,69 @@ JSONChunkWriter::finish() {
return std::make_shared<JSONChunk>(row_nums_, data, size, nullable_);
}

void
GeometryChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
std::vector<std::string> wkb_strs;
std::vector<std::pair<const uint8_t*, int64_t>> null_bitmaps;
for (auto batch : *data) {
auto data = batch.ValueOrDie()->column(0);
auto array = std::dynamic_pointer_cast<arrow::BinaryArray>(data);
for (int i = 0; i < array->length(); i++) {
auto str = array->GetView(i);
wkb_strs.emplace_back(str);
size += str.size();
}
auto null_bitmap_n = (data->length() + 7) / 8;
null_bitmaps.emplace_back(data->null_bitmap_data(), null_bitmap_n);
size += null_bitmap_n;
row_nums_ += array->length();
}
size += sizeof(uint64_t) * (row_nums_ + 1) + MMAP_GEOMETRY_PADDING;
if (file_) {
target_ = std::make_shared<MmapChunkTarget>(*file_, file_offset_);
} else {
target_ = std::make_shared<MemChunkTarget>(size);
}

// chunk layout: null bitmap, offset1, offset2, ..., offsetn, wkb1, wkb2, ..., wkbn, padding
// write null bitmaps
for (auto [data, size] : null_bitmaps) {
if (data == nullptr) {
std::vector<uint8_t> null_bitmap(size, 0xff);
target_->write(null_bitmap.data(), size);
} else {
target_->write(data, size);
}
}

int offset_num = row_nums_ + 1;
int offset_start_pos = target_->tell() + sizeof(uint64_t) * offset_num;
std::vector<uint64_t> offsets;

for (auto str : wkb_strs) {
offsets.push_back(offset_start_pos);
offset_start_pos += str.size();
}
offsets.push_back(offset_start_pos);

target_->write(offsets.data(), offsets.size() * sizeof(uint64_t));

for (auto str : wkb_strs) {
target_->write(str.data(), str.size());
}
}

std::shared_ptr<Chunk>
GeometryChunkWriter::finish() {
// write padding, maybe not needed anymore
// FIXME
char padding[MMAP_GEOMETRY_PADDING];
target_->write(padding, MMAP_GEOMETRY_PADDING);
auto [data, size] = target_->get();
return std::make_shared<GeometryChunk>(row_nums_, data, size, nullable_);
}

void
ArrayChunkWriter::write(std::shared_ptr<arrow::RecordBatchReader> data) {
auto size = 0;
Expand Down Expand Up @@ -383,6 +447,9 @@ create_chunk(const FieldMeta& field_meta,
w = std::make_shared<JSONChunkWriter>(nullable);
break;
}
case milvus::DataType::GEOMETRY: {
w = std::make_shared<GeometryChunkWriter>(nullable);
}
case milvus::DataType::ARRAY: {
w = std::make_shared<ArrayChunkWriter>(
field_meta.get_element_type(), nullable);
Expand Down Expand Up @@ -479,6 +546,11 @@ create_chunk(const FieldMeta& field_meta,
w = std::make_shared<JSONChunkWriter>(file, file_offset, nullable);
break;
}
case milvus::DataType::GEOMETRY: {
w = std::make_shared<GeometryChunkWriter>(
file, file_offset, nullable);
break;
}
case milvus::DataType::ARRAY: {
w = std::make_shared<ArrayChunkWriter>(
field_meta.get_element_type(), file, file_offset, nullable);
Expand Down
11 changes: 11 additions & 0 deletions internal/core/src/common/ChunkWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "common/Chunk.h"
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Geometry.h"
namespace milvus {

class ChunkWriterBase {
Expand Down Expand Up @@ -180,6 +181,16 @@ class JSONChunkWriter : public ChunkWriterBase {
finish() override;
};

class GeometryChunkWriter : public ChunkWriterBase {
public:
using ChunkWriterBase::ChunkWriterBase;
void
write(std::shared_ptr<arrow::RecordBatchReader> data) override;

std::shared_ptr<Chunk>
finish() override;
};

class ArrayChunkWriter : public ChunkWriterBase {
public:
ArrayChunkWriter(const milvus::DataType element_type, bool nullable)
Expand Down

0 comments on commit 95b17a4

Please sign in to comment.