Skip to content

Commit

Permalink
enhance: optimize retrieve on dynamic field (#35580)
Browse files Browse the repository at this point in the history
#35514

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
Co-authored-by: zhenshan.cao <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent 570a887 commit 3107701
Show file tree
Hide file tree
Showing 20 changed files with 551 additions and 365 deletions.
1 change: 1 addition & 0 deletions internal/core/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MilvusConan(ConanFile):
"abseil/20230125.3#dad7cc4c83bbd44c1f1cc9cc4d97ac88",
"roaring/3.0.0#25a703f80eda0764a31ef939229e202d",
"grpc/1.50.1@milvus/dev#75103960d1cac300cf425ccfccceac08",
"rapidjson/cci.20230929#624c0094d741e6a3749d2e44d834b96c"
)
generators = ("cmake", "cmake_find_package")
default_options = {
Expand Down
34 changes: 34 additions & 0 deletions internal/core/src/common/Json.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,42 @@
#include "simdjson/dom/element.h"
#include "simdjson/error.h"
#include "simdjson/padded_string.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"

namespace milvus {
// function to extract specific keys and convert them to json
// rapidjson is suitable for extract and reconstruct serialization
// instead of simdjson which not suitable for serialization
inline std::string
ExtractSubJson(const std::string& json, const std::vector<std::string>& keys) {
rapidjson::Document doc;
doc.Parse(json.c_str());
if (doc.HasParseError()) {
PanicInfo(ErrorCode::UnexpectedError,
"json parse failed, error:{}",
doc.GetParseError());
}

rapidjson::Document result_doc;
result_doc.SetObject();
rapidjson::Document::AllocatorType& allocator = result_doc.GetAllocator();

for (const auto& key : keys) {
if (doc.HasMember(key.c_str())) {
result_doc.AddMember(rapidjson::Value(key.c_str(), allocator),
doc[key.c_str()],
allocator);
}
}

rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
result_doc.Accept(writer);
return buffer.GetString();
}

using document = simdjson::ondemand::document;
template <typename T>
using value_result = simdjson::simdjson_result<T>;
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/common/Schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ Schema::ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto) {
"repetitive primary key");
schema->set_primary_field_id(field_id);
}

if (child.is_dynamic()) {
Assert(schema_proto.enable_dynamic_field());
AssertInfo(!schema->get_dynamic_field_id().has_value(),
"repetitive dynamic field");
schema->set_dynamic_field_id(field_id);
}
}

AssertInfo(schema->get_primary_field_id().has_value(),
Expand Down
11 changes: 11 additions & 0 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ class Schema {
this->primary_field_id_opt_ = field_id;
}

void
set_dynamic_field_id(FieldId field_id) {
this->dynamic_field_id_opt_ = field_id;
}

auto
begin() const {
return fields_.begin();
Expand Down Expand Up @@ -184,6 +189,11 @@ class Schema {
return primary_field_id_opt_;
}

std::optional<FieldId>
get_dynamic_field_id() const {
return dynamic_field_id_opt_;
}

public:
static std::shared_ptr<Schema>
ParseFrom(const milvus::proto::schema::CollectionSchema& schema_proto);
Expand Down Expand Up @@ -213,6 +223,7 @@ class Schema {
std::unordered_map<FieldId, FieldName> id_names_; // field_id -> field_name

std::optional<FieldId> primary_field_id_opt_;
std::optional<FieldId> dynamic_field_id_opt_;
};

using SchemaPtr = std::shared_ptr<Schema>;
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/query/PlanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct Plan {
std::unique_ptr<VectorPlanNode> plan_node_;
std::map<std::string, FieldId> tag2field_; // PlaceholderName -> FieldId
std::vector<FieldId> target_entries_;
std::vector<std::string> target_dynamic_fields_;
void
check_identical(Plan& other);

Expand Down Expand Up @@ -100,6 +101,7 @@ struct RetrievePlan {
const Schema& schema_;
std::unique_ptr<RetrievePlanNode> plan_node_;
std::vector<FieldId> field_ids_;
std::vector<std::string> target_dynamic_fields_;
};

using PlanPtr = std::unique_ptr<Plan>;
Expand Down
11 changes: 11 additions & 0 deletions internal/core/src/query/PlanProto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ ProtoParser::RetrievePlanNodeFromProto(

std::unique_ptr<Plan>
ProtoParser::CreatePlan(const proto::plan::PlanNode& plan_node_proto) {
LOG_DEBUG("create search plan from proto: {}",
plan_node_proto.DebugString());
auto plan = std::make_unique<Plan>(schema);

auto plan_node = PlanNodeFromProto(plan_node_proto);
Expand All @@ -320,12 +322,17 @@ ProtoParser::CreatePlan(const proto::plan::PlanNode& plan_node_proto) {
auto field_id = FieldId(field_id_raw);
plan->target_entries_.push_back(field_id);
}
for (auto dynamic_field : plan_node_proto.dynamic_fields()) {
plan->target_dynamic_fields_.push_back(dynamic_field);
}

return plan;
}

std::unique_ptr<RetrievePlan>
ProtoParser::CreateRetrievePlan(const proto::plan::PlanNode& plan_node_proto) {
LOG_DEBUG("create retrieve plan from proto: {}",
plan_node_proto.DebugString());
auto retrieve_plan = std::make_unique<RetrievePlan>(schema);

auto plan_node = RetrievePlanNodeFromProto(plan_node_proto);
Expand All @@ -338,6 +345,10 @@ ProtoParser::CreateRetrievePlan(const proto::plan::PlanNode& plan_node_proto) {
auto field_id = FieldId(field_id_raw);
retrieve_plan->field_ids_.push_back(field_id);
}
for (auto dynamic_field : plan_node_proto.dynamic_fields()) {
retrieve_plan->target_dynamic_fields_.push_back(dynamic_field);
}

return retrieve_plan;
}

Expand Down
29 changes: 29 additions & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,35 @@ SegmentGrowingImpl::vector_search(SearchInfo& search_info,
*this, search_info, query_data, query_count, timestamp, bitset, output);
}

std::unique_ptr<DataArray>
SegmentGrowingImpl::bulk_subscript(
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const {
Assert(!dynamic_field_names.empty());
auto& field_meta = schema_->operator[](field_id);
auto vec_ptr = insert_record_.get_data_base(field_id);
auto result = CreateScalarDataArray(count, field_meta);
if (field_meta.is_nullable()) {
auto valid_data_ptr = insert_record_.get_valid_data(field_id);
auto res = result->mutable_valid_data()->mutable_data();
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
res[i] = valid_data_ptr->is_valid(offset);
}
}
auto vec = dynamic_cast<const ConcurrentVector<Json>*>(vec_ptr);
auto dst = result->mutable_scalars()->mutable_json_data()->mutable_data();
auto& src = *vec;
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst->at(i) =
ExtractSubJson(std::string(src[offset]), dynamic_field_names);
}
return result;
}

std::unique_ptr<DataArray>
SegmentGrowingImpl::bulk_subscript(FieldId field_id,
const int64_t* seg_offsets,
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ class SegmentGrowingImpl : public SegmentGrowing {
const int64_t* seg_offsets,
int64_t count) const override;

std::unique_ptr<DataArray>
bulk_subscript(
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const override;

public:
friend std::unique_ptr<SegmentGrowing>
CreateGrowingSegment(SchemaPtr schema,
Expand Down
25 changes: 23 additions & 2 deletions internal/core/src/segcore/SegmentInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,21 @@ SegmentInternalInterface::FillTargetEntry(const query::Plan* plan,
AssertInfo(results.seg_offsets_.size() == size,
"Size of result distances is not equal to size of ids");

std::unique_ptr<DataArray> field_data;
// fill other entries except primary key by result_offset
for (auto field_id : plan->target_entries_) {
auto field_data =
bulk_subscript(field_id, results.seg_offsets_.data(), size);
if (plan->schema_.get_dynamic_field_id().has_value() &&
plan->schema_.get_dynamic_field_id().value() == field_id &&
!plan->target_dynamic_fields_.empty()) {
auto& target_dynamic_fields = plan->target_dynamic_fields_;
field_data = std::move(bulk_subscript(field_id,
results.seg_offsets_.data(),
size,
target_dynamic_fields));
} else {
field_data = std::move(
bulk_subscript(field_id, results.seg_offsets_.data(), size));
}
results.output_fields_data_[field_id] = std::move(field_data);
}
}
Expand Down Expand Up @@ -167,6 +178,16 @@ SegmentInternalInterface::FillTargetEntry(
continue;
}

if (plan->schema_.get_dynamic_field_id().has_value() &&
plan->schema_.get_dynamic_field_id().value() == field_id &&
!plan->target_dynamic_fields_.empty()) {
auto& target_dynamic_fields = plan->target_dynamic_fields_;
auto col =
bulk_subscript(field_id, offsets, size, target_dynamic_fields);
fields_data->AddAllocated(col.release());
continue;
}

auto& field_meta = plan->schema_[field_id];

auto col = bulk_subscript(field_id, offsets, size);
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,13 @@ class SegmentInternalInterface : public SegmentInterface {
const int64_t* seg_offsets,
int64_t count) const = 0;

virtual std::unique_ptr<DataArray>
bulk_subscript(
FieldId field_ids,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const = 0;

virtual void
check_search(const query::Plan* plan) const = 0;

Expand Down
31 changes: 31 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,37 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id,
return get_raw_data(field_id, field_meta, seg_offsets, count);
}

std::unique_ptr<DataArray>
SegmentSealedImpl::bulk_subscript(
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const {
Assert(!dynamic_field_names.empty());
auto& field_meta = schema_->operator[](field_id);
if (count == 0) {
return fill_with_empty(field_id, 0);
}

auto column = fields_.at(field_id);
auto ret = fill_with_empty(field_id, count);
if (column->IsNullable()) {
auto dst = ret->mutable_valid_data()->mutable_data();
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst[i] = column->IsValid(offset);
}
}
auto dst = ret->mutable_scalars()->mutable_json_data()->mutable_data();
auto field = reinterpret_cast<const VariableColumn<Json>*>(column.get());
for (int64_t i = 0; i < count; ++i) {
auto offset = seg_offsets[i];
dst->at(i) = ExtractSubJson(std::string(field->RawAt(offset)),
dynamic_field_names);
}
return ret;
}

bool
SegmentSealedImpl::HasIndex(FieldId field_id) const {
std::shared_lock lck(mutex_);
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ class SegmentSealedImpl : public SegmentSealed {
const int64_t* seg_offsets,
int64_t count) const override;

std::unique_ptr<DataArray>
bulk_subscript(
FieldId field_id,
const int64_t* seg_offsets,
int64_t count,
const std::vector<std::string>& dynamic_field_names) const override;

bool
is_mmap_field(FieldId id) const override;

Expand Down
Loading

0 comments on commit 3107701

Please sign in to comment.