diff --git a/internal/core/src/common/jsmn.h b/internal/core/src/common/jsmn.h index f20b56ba48a68..3843d6efe2d86 100644 --- a/internal/core/src/common/jsmn.h +++ b/internal/core/src/common/jsmn.h @@ -1,3 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + /* * MIT License * diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index 771e8db45a141..dfd9386d66d4c 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -260,17 +260,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { TargetBitmapView valid_res(res_vec->GetValidRawData(), real_batch_size); valid_res.set(); - auto execute_sub_batch = [lower_inclusive, - upper_inclusive]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - HighPrecisionType val1, - HighPrecisionType val2) { + auto execute_sub_batch = + [ lower_inclusive, + upper_inclusive ]( + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + HighPrecisionType val1, + HighPrecisionType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFunc func; func(val1, val2, data, size, res, offsets); @@ -365,18 +365,17 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForJson(OffsetVector* input) { ValueType val2 = GetValueFromProto(expr_->upper_val_); auto pointer = milvus::Json::pointer(expr_->column_.nested_path_); - auto execute_sub_batch = [lower_inclusive, - upper_inclusive, - pointer]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2) { + auto execute_sub_batch = + [ lower_inclusive, upper_inclusive, + pointer ]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForJson func; @@ -539,18 +538,18 @@ PhyBinaryRangeFilterExpr::ExecRangeVisitorImplForArray(OffsetVector* input) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = [lower_inclusive, - upper_inclusive]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val1, - ValueType val2, - int index) { + auto execute_sub_batch = + [ lower_inclusive, + upper_inclusive ]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val1, + ValueType val2, + int index) { if (lower_inclusive && upper_inclusive) { BinaryRangeElementFuncForArray func; diff --git a/internal/core/src/exec/expression/ExistsExpr.cpp b/internal/core/src/exec/expression/ExistsExpr.cpp index cc64dcb57c43b..9a4129a725c72 100644 --- a/internal/core/src/exec/expression/ExistsExpr.cpp +++ b/internal/core/src/exec/expression/ExistsExpr.cpp @@ -67,18 +67,18 @@ PhyExistsFilterExpr::EvalJsonExistsForDataSegment(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::string& pointer) { - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = data[offset].exist(pointer); + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; } - }; + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = data[offset].exist(pointer); + } + }; int64_t processed_size; if (has_offset_input_) { diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 46cd3fc220a91..9ed579064d0d9 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -196,28 +196,27 @@ PhyJsonContainsFilterExpr::ExecArrayContains(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - const auto& array = data[i]; - for (int j = 0; j < array.length(); ++j) { - if (elements.count(array.template get_data(j)) > - 0) { - return true; - } - } - return false; - }; - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + auto executor = [&](size_t i) { + const auto& array = data[i]; + for (int j = 0; j < array.length(); ++j) { + if (elements.count(array.template get_data(j)) > 0) { + return true; } - res[i] = executor(offset); } + return false; }; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -280,35 +279,35 @@ PhyJsonContainsFilterExpr::ExecJsonContains(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (elements.count(val.value()) > 0) { - return true; - } - } + auto executor = [&](size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { return false; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; + } + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { continue; } - res[i] = executor(offset); + if (elements.count(val.value()) > 0) { + return true; + } } + return false; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -433,44 +432,44 @@ PhyJsonContainsFilterExpr::ExecJsonContainsArray(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; - } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (auto const& element : elements) { - if (CompareTwoJsonArray(json_array, element)) { - return true; - } - } - } + auto executor = [&](size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { return false; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; + } + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { continue; } - res[i] = executor(offset); + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); + } + for (auto const& element : elements) { + if (CompareTwoJsonArray(json_array, element)) { + return true; + } + } } + return false; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -593,29 +592,29 @@ PhyJsonContainsFilterExpr::ExecArrayContainsAll(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::unordered_set& elements) { - auto executor = [&](size_t i) { - std::unordered_set tmp_elements(elements); - // Note: array can only be iterated once - for (int j = 0; j < data[i].length(); ++j) { - tmp_elements.erase(data[i].template get_data(j)); - if (tmp_elements.size() == 0) { - return true; - } - } - return tmp_elements.size() == 0; - }; - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + auto executor = [&](size_t i) { + std::unordered_set tmp_elements(elements); + // Note: array can only be iterated once + for (int j = 0; j < data[i].length(); ++j) { + tmp_elements.erase(data[i].template get_data(j)); + if (tmp_elements.size() == 0) { + return true; } - res[i] = executor(offset); } + return tmp_elements.size() == 0; }; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -678,38 +677,38 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAll(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::unordered_set& elements) { - auto executor = [&](const size_t i) -> bool { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set tmp_elements(elements); - // Note: array can only be iterated once - for (auto&& it : array) { - auto val = it.template get(); - if (val.error()) { - continue; - } - tmp_elements.erase(val.value()); - if (tmp_elements.size() == 0) { - return true; - } - } - return tmp_elements.size() == 0; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; + auto executor = [&](const size_t i) -> bool { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::unordered_set tmp_elements(elements); + // Note: array can only be iterated once + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { continue; } - res[i] = executor(offset); + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } } + return tmp_elements.size() == 0; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -841,98 +840,96 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllWithDiffType( const std::string& pointer, const std::vector& elements, const std::unordered_set elements_index) { - auto executor = [&](size_t i) -> bool { - const auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - std::unordered_set tmp_elements_index(elements_index); - for (auto&& it : array) { - int i = -1; - for (auto& element : elements) { - i++; - switch (element.val_case()) { - case proto::plan::GenericValue::kBoolVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.bool_val()) { - tmp_elements_index.erase(i); - } - break; + auto executor = [&](size_t i) -> bool { + const auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::unordered_set tmp_elements_index(elements_index); + for (auto&& it : array) { + int i = -1; + for (auto& element : elements) { + i++; + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kInt64Val: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.int64_val()) { - tmp_elements_index.erase(i); - } - break; + if (val.value() == element.bool_val()) { + tmp_elements_index.erase(i); } - case proto::plan::GenericValue::kFloatVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.float_val()) { - tmp_elements_index.erase(i); - } - break; + break; + } + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kStringVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.string_val()) { - tmp_elements_index.erase(i); - } - break; + if (val.value() == element.int64_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kArrayVal: { - auto val = it.get_array(); - if (val.error()) { - continue; - } - if (CompareTwoJsonArray(val, - element.array_val())) { - tmp_elements_index.erase(i); - } - break; + if (val.value() == element.float_val()) { + tmp_elements_index.erase(i); } - default: - PanicInfo( - DataTypeInvalid, - fmt::format("unsupported data type {}", - element.val_case())); + break; } - if (tmp_elements_index.size() == 0) { - return true; + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + tmp_elements_index.erase(i); + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, element.array_val())) { + tmp_elements_index.erase(i); + } + break; } + default: + PanicInfo(DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); } if (tmp_elements_index.size() == 0) { return true; } } - return tmp_elements_index.size() == 0; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + if (tmp_elements_index.size() == 0) { + return true; } - res[i] = executor(offset); } + return tmp_elements_index.size() == 0; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -1119,48 +1116,48 @@ PhyJsonContainsFilterExpr::ExecJsonContainsAllArray(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; + auto executor = [&](const size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + std::unordered_set exist_elements_index; + for (auto&& it : array) { + auto val = it.get_array(); + if (val.error()) { + continue; } - std::unordered_set exist_elements_index; - for (auto&& it : array) { - auto val = it.get_array(); - if (val.error()) { - continue; - } - std::vector< - simdjson::simdjson_result> - json_array; - json_array.reserve(val.count_elements()); - for (auto&& e : val) { - json_array.emplace_back(e); - } - for (int index = 0; index < elements.size(); ++index) { - if (CompareTwoJsonArray(json_array, elements[index])) { - exist_elements_index.insert(index); - } - } - if (exist_elements_index.size() == elements.size()) { - return true; - } + std::vector< + simdjson::simdjson_result> + json_array; + json_array.reserve(val.count_elements()); + for (auto&& e : val) { + json_array.emplace_back(e); } - return exist_elements_index.size() == elements.size(); - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; + for (int index = 0; index < elements.size(); ++index) { + if (CompareTwoJsonArray(json_array, elements[index])) { + exist_elements_index.insert(index); + } } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + if (exist_elements_index.size() == elements.size()) { + return true; } - res[i] = executor(offset); } + return exist_elements_index.size() == elements.size(); }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -1290,90 +1287,88 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType(OffsetVector* input) { TargetBitmapView valid_res, const std::string& pointer, const std::vector& elements) { - auto executor = [&](const size_t i) { - auto& json = data[i]; - auto doc = json.doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) { - return false; - } - // Note: array can only be iterated once - for (auto&& it : array) { - for (auto const& element : elements) { - switch (element.val_case()) { - case proto::plan::GenericValue::kBoolVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.bool_val()) { - return true; - } - break; + auto executor = [&](const size_t i) { + auto& json = data[i]; + auto doc = json.doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) { + return false; + } + // Note: array can only be iterated once + for (auto&& it : array) { + for (auto const& element : elements) { + switch (element.val_case()) { + case proto::plan::GenericValue::kBoolVal: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kInt64Val: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.int64_val()) { - return true; - } - break; + if (val.value() == element.bool_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kInt64Val: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kFloatVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.float_val()) { - return true; - } - break; + if (val.value() == element.int64_val()) { + return true; } - case proto::plan::GenericValue::kStringVal: { - auto val = it.template get(); - if (val.error()) { - continue; - } - if (val.value() == element.string_val()) { - return true; - } - break; + break; + } + case proto::plan::GenericValue::kFloatVal: { + auto val = it.template get(); + if (val.error()) { + continue; } - case proto::plan::GenericValue::kArrayVal: { - auto val = it.get_array(); - if (val.error()) { - continue; - } - if (CompareTwoJsonArray(val, - element.array_val())) { - return true; - } - break; + if (val.value() == element.float_val()) { + return true; } - default: - PanicInfo( - DataTypeInvalid, - fmt::format("unsupported data type {}", - element.val_case())); + break; } + case proto::plan::GenericValue::kStringVal: { + auto val = it.template get(); + if (val.error()) { + continue; + } + if (val.value() == element.string_val()) { + return true; + } + break; + } + case proto::plan::GenericValue::kArrayVal: { + auto val = it.get_array(); + if (val.error()) { + continue; + } + if (CompareTwoJsonArray(val, element.array_val())) { + return true; + } + break; + } + default: + PanicInfo(DataTypeInvalid, + fmt::format("unsupported data type {}", + element.val_case())); } } - return false; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); } + return false; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index de7b35f9e331c..ad68d05b4ef08 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -287,27 +287,27 @@ PhyTermFilterExpr::ExecTermArrayVariableInField(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const ValueType& target_val) { - auto executor = [&](size_t offset) { - for (int i = 0; i < data[offset].length(); i++) { - auto val = data[offset].template get_data(i); - if (val == target_val) { - return true; - } - } - return false; - }; - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; + auto executor = [&](size_t offset) { + for (int i = 0; i < data[offset].length(); i++) { + auto val = data[offset].template get_data(i); + if (val == target_val) { + return true; } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = executor(offset); } + return false; }; + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -374,23 +374,23 @@ PhyTermFilterExpr::ExecTermArrayFieldInVariable(OffsetVector* input) { TargetBitmapView valid_res, int index, const std::unordered_set& term_set) { - for (int i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (term_set.empty() || index >= data[offset].length()) { - res[i] = false; - continue; - } - auto value = data[offset].get_data(index); - res[i] = term_set.find(ValueType(value)) != term_set.end(); + for (int i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; } - }; + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (term_set.empty() || index >= data[offset].length()) { + res[i] = false; + continue; + } + auto value = data[offset].get_data(index); + res[i] = term_set.find(ValueType(value)) != term_set.end(); + } + }; int64_t processed_size; if (has_offset_input_) { @@ -451,34 +451,34 @@ PhyTermFilterExpr::ExecTermJsonVariableInField(OffsetVector* input) { TargetBitmapView valid_res, const std::string pointer, const ValueType& target_val) { - auto executor = [&](size_t i) { - auto doc = data[i].doc(); - auto array = doc.at_pointer(pointer).get_array(); - if (array.error()) - return false; - for (auto it = array.begin(); it != array.end(); ++it) { - auto val = (*it).template get(); - if (val.error()) { - return false; - } - if (val.value() == target_val) { - return true; - } - } + auto executor = [&](size_t i) { + auto doc = data[i].doc(); + auto array = doc.at_pointer(pointer).get_array(); + if (array.error()) return false; - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; + for (auto it = array.begin(); it != array.end(); ++it) { + auto val = (*it).template get(); + if (val.error()) { + return false; } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; + if (val.value() == target_val) { + return true; } - res[i] = executor(offset); } + return false; }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, @@ -615,40 +615,40 @@ PhyTermFilterExpr::ExecTermJsonFieldInVariable(OffsetVector* input) { TargetBitmapView valid_res, const std::string pointer, const std::unordered_set& terms) { - auto executor = [&](size_t i) { - auto x = data[i].template at(pointer); - if (x.error()) { - if constexpr (std::is_same_v) { - auto x = data[i].template at(pointer); - if (x.error()) { - return false; - } - - auto value = x.value(); - // if the term set is {1}, and the value is 1.1, we should not return true. - return std::floor(value) == value && - terms.find(ValueType(value)) != terms.end(); + auto executor = [&](size_t i) { + auto x = data[i].template at(pointer); + if (x.error()) { + if constexpr (std::is_same_v) { + auto x = data[i].template at(pointer); + if (x.error()) { + return false; } - return false; - } - return terms.find(ValueType(x.value())) != terms.end(); - }; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - if (terms.empty()) { - res[i] = false; - continue; + + auto value = x.value(); + // if the term set is {1}, and the value is 1.1, we should not return true. + return std::floor(value) == value && + terms.find(ValueType(value)) != terms.end(); } - res[i] = executor(offset); + return false; } + return terms.find(ValueType(x.value())) != terms.end(); }; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; + } + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + if (terms.empty()) { + res[i] = false; + continue; + } + res[i] = executor(offset); + } + }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, @@ -775,19 +775,19 @@ PhyTermFilterExpr::ExecVisitorImplForData(OffsetVector* input) { TargetBitmapView res, TargetBitmapView valid_res, const std::unordered_set& vals) { - TermElementFuncSet func; - for (size_t i = 0; i < size; ++i) { - auto offset = i; - if constexpr (filter_type == FilterType::random) { - offset = (offsets) ? offsets[i] : i; - } - if (valid_data != nullptr && !valid_data[offset]) { - res[i] = valid_res[i] = false; - continue; - } - res[i] = func(vals, data[offset]); + TermElementFuncSet func; + for (size_t i = 0; i < size; ++i) { + auto offset = i; + if constexpr (filter_type == FilterType::random) { + offset = (offsets) ? offsets[i] : i; } - }; + if (valid_data != nullptr && !valid_data[offset]) { + res[i] = valid_res[i] = false; + continue; + } + res[i] = func(vals, data[offset]); + } + }; int64_t processed_size; if (has_offset_input_) { processed_size = ProcessDataByOffsets(execute_sub_batch, diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 4ece8ddd86f33..6d391b9f83b83 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -276,145 +276,144 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(OffsetVector* input) { if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = - [op_type]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val, - int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - default: - PanicInfo( - OpTypeInvalid, - fmt::format( - "unsupported operator type for unary expr: {}", - op_type)); + auto execute_sub_batch = [op_type]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val, + int index) { + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; } - }; + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format("unsupported operator type for unary expr: {}", + op_type)); + } + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -480,7 +479,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [size_per_chunk, this](int64_t offset) -> auto { + retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = @@ -605,15 +604,15 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = (cmp); \ } while (false) - auto execute_sub_batch = [op_type, pointer]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = + [ op_type, pointer ]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { @@ -1123,13 +1122,13 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { auto execute_sub_batch = [expr_type]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index f4c368ab58f50..d69e8c0270c95 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -356,7 +356,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { template VectorPtr ExecRangeVisitorImplJsonForIndex(); - + template VectorPtr ExecRangeVisitorImplArray(OffsetVector* input = nullptr); diff --git a/internal/core/src/index/JsonKeyInvertedIndex.cpp b/internal/core/src/index/JsonKeyInvertedIndex.cpp index e76644c2d5349..cbf3d1af7cfb3 100644 --- a/internal/core/src/index/JsonKeyInvertedIndex.cpp +++ b/internal/core/src/index/JsonKeyInvertedIndex.cpp @@ -26,8 +26,6 @@ JsonKeyInvertedIndex::AddInvertedRecord(const std::vector& paths, uint16_t offset, uint16_t length) { auto key = std::string("/") + Join(paths, "."); - std::cout << "xxx insert inverted key" << key << "rowid" << row_id - << "offset" << offset << "length" << length << std::endl; LOG_DEBUG( "insert inverted key: {}, row_id: {}, offset: " "{}, length:{}", @@ -129,6 +127,7 @@ JsonKeyInvertedIndex::AddJson(const char* json, int64_t offset) { int index = 0; std::vector paths; TravelJson(json, tokens, index, paths, offset); + free(tokens); } JsonKeyInvertedIndex::JsonKeyInvertedIndex( diff --git a/internal/core/src/index/JsonKeyInvertedIndex.h b/internal/core/src/index/JsonKeyInvertedIndex.h index b686f63897f2e..220374ccb504f 100644 --- a/internal/core/src/index/JsonKeyInvertedIndex.h +++ b/internal/core/src/index/JsonKeyInvertedIndex.h @@ -25,7 +25,7 @@ class JsonKeyInvertedIndex : public InvertedIndexTantivy { explicit JsonKeyInvertedIndex(const storage::FileManagerContext& ctx, bool is_load); - ~JsonKeyInvertedIndex() override {}; + ~JsonKeyInvertedIndex() override{}; public: BinarySet diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index 8d3b74cda2943..0daaab7cf76a9 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -282,8 +282,8 @@ BuildJsonKeyIndex(CBinarySet* c_binary_set, auto field_schema = FieldMeta::ParseFrom(build_index_info->field_schema()); - auto index = - std::make_unique(fileManagerContext, false); + auto index = std::make_unique( + fileManagerContext, false); index->Build(config); auto binary = std::make_unique(index->Upload(config)); diff --git a/internal/core/src/mmap/ChunkedColumn.h b/internal/core/src/mmap/ChunkedColumn.h index db2bfc278f37f..f35b9862e00b6 100644 --- a/internal/core/src/mmap/ChunkedColumn.h +++ b/internal/core/src/mmap/ChunkedColumn.h @@ -59,7 +59,7 @@ class ChunkedColumnBase : public ColumnBase { } } - virtual ~ChunkedColumnBase() {}; + virtual ~ChunkedColumnBase(){}; virtual void AppendBatch(const FieldDataPtr data) override { diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index df8365f3da28d..6a069b859abec 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -5281,387 +5281,4 @@ TEST(CApiTest, RANGE_SEARCH_WITH_RADIUS_AND_RANGE_FILTER_WHEN_IP_BFLOAT16) { TEST(CApiTest, IsLoadWithDisk) { ASSERT_TRUE(IsLoadWithDisk(INVERTED_INDEX_TYPE, 0)); -} - -// 1000 keys -std::string -GenerateJson(int N) { - std::vector data(N); - std::default_random_engine er(67); - std::normal_distribution<> distr(0, 1); - std::vector keys; - for (int i = 0; i < N; i++) { - keys.push_back("keys" + std::to_string(i)); - } - std::string json_string; - std::vector values(N); - for (int i = 0; i < N; i++) { - if (i % 7 == 0 || i % 7 == 4) { - values[i] = std::to_string(er()); - } else if (i % 7 == 1 || i % 7 == 5) { - values[i] = std::to_string(static_cast(er())); - } else if (i % 7 == 2 || i % 7 == 6) { - values[i] = er() / 2 == 0 ? "true" : "false"; - } else if (i % 7 == 3) { - values[i] = "\"xxxx" + std::to_string(i) + "\""; - // } else if (i % 7 == 4) { - // std::vector intvec(10); - // for (int j = 0; j < 10; j++) { - // intvec[j] = std::to_string(i + j); - // } - // values[i] = "[" + join(intvec, ",") + "]"; - // } else if (i % 7 == 5) { - // std::vector doublevec(10); - // for (int j = 0; j < 10; j++) { - // doublevec[j] = - // std::to_string(static_cast(i + j + er())); - // } - // values[i] = "[" + join(doublevec, ",") + "]"; - // } else if (i % 7 == 6) { - // std::vector stringvec(10); - // for (int j = 0; j < 10; j++) { - // stringvec[j] = "\"xxx" + std::to_string(j) + "\""; - // } - // values[i] = "[" + join(stringvec, ",") + "]"; - } - } - json_string += "{"; - for (int i = 0; i < N - 1; i++) { - json_string += R"(")" + keys[i] + R"(":)" + values[i] + R"(,)"; - } - json_string += R"(")" + keys[N - 1] + R"(":)" + values[N - 1]; - json_string += "}"; - return json_string; -} - -void -ParseJson(const std::string& json) { - jsmn_parser p; - jsmntok_t t[2002]; - - jsmn_init(&p); - int r = jsmn_parse( - &p, json.c_str(), strlen(json.c_str()), t, sizeof(t) / sizeof(t[0])); - if (r < 0) { - printf("Failed to parse JSON: %d\n", r); - return; - } - if (r < 1 || t[0].type != JSMN_OBJECT) { - printf("Object expected\n"); - return; - } - //std::cout << r << std::endl; -} - -TEST(CApiTest, test_parse_perform) { - for (int i = 0; i < 10000; i++) { - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(1000); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - //std::cout << json_string << std::endl; - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(100); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - //std::cout << json_string << std::endl; - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - { - int64_t all_cost = 0; - for (int j = 0; j < 10000; j++) { - auto json_string = GenerateJson(50); - if (j == 0) { - std::cout << json_string.size() << std::endl; - } - auto start = std::chrono::steady_clock::now(); - ParseJson(json_string); - all_cost += - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count(); - } - std::cout << "cost: " << all_cost << "us" << std::endl; - } - } -} - -void -extract_key_value_pairs(const char* json, size_t len) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - size_t pos = 0; - while (pos < len) { - size_t chunk_size = - len - pos > 256 ? 256 : len - pos; // Read in chunks of 256 bytes - int r = - jsmn_parse(&parser, json + pos, chunk_size, tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - - // Update the position - pos += chunk_size; - } - - // Iterate through the tokens - for (int i = 0; i < parser.toknext; i++) { - if (tokens[i].type == JSMN_OBJECT) { - for (int j = 0; j < tokens[i].size; j++) { - // The next token is the key (string) - j++; - printf("Key: %.*s\n", - tokens[j].end - tokens[j].start, - json + tokens[j].start); - - // The next token is the value - j++; - printf("Value: %.*s\n", - tokens[j].end - tokens[j].start, - json + tokens[j].start); - } - } - } - - // Clean up - free(tokens); -} - -void -TravelJson(const char* json, - jsmntok* tokens, - int& index, - std::vector& path) { - jsmntok current = tokens[0]; - if (current.type == JSMN_OBJECT) { - int j = 1; - for (int i = 0; i < current.size; i++) { - assert(tokens[j].type == JSMN_STRING && tokens[j].size != 0); - std::string key(json + tokens[j].start, - tokens[j].end - tokens[j].start); - path.push_back(key); - j++; - int consumed = 0; - TravelJson(json, tokens + j, consumed, path); - path.pop_back(); - j += consumed; - } - index = j; - } else if (current.type == JSMN_PRIMITIVE) { - std::cout << "key:" << Join(path, ".") << "values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - index++; - } else if (current.type == JSMN_ARRAY) { - std::cout << "key:" << Join(path, ".") << "values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - // skip next array parse - int count = current.size; - int j = 1; - while (count > 0) { - if (tokens[j].size == 0) { - count--; - } else { - count += tokens[j].size; - } - j++; - } - index = j; - - } else if (current.type == JSMN_STRING) { - if (current.size == 0) { - std::cout << "key:" << Join(path, ".") << " values:" - << std::string(json + current.start, - current.end - current.start) - << std::endl; - index++; - } else { - throw std::runtime_error("not should happen"); - } - } else { - throw std::runtime_error("not should happen"); - } -} - -void -extract_key_value_pairs(const char* json) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - // Parse the JSON string - while (1) { - int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - num_tokens = r; - break; // Exit the loop if parsing was successful - } - - std::cout << "num_tokens:" << num_tokens << std::endl; - // Iterate through the tokens - for (int i = 0; i < num_tokens; i++) { - std::cout << "i:" << i << "type: " << tokens[i].type - << "token size:" << tokens[i].size << std::endl; - printf("value: %.*s\n", - tokens[i].end - tokens[i].start, - json + tokens[i].start); - } - - std::cout << "-----------------" << std::endl; - int index = 0; - std::vector path; - TravelJson(json, tokens, index, path); - - // Clean up - free(tokens); -} - -void -extract_json(const char* json) { - jsmn_parser parser; - jsmntok_t* tokens = - (jsmntok_t*)malloc(16 * sizeof(jsmntok_t)); // Initial allocation - if (!tokens) { - fprintf(stderr, "Memory allocation failed\n"); - return; - } - int num_tokens = 0; - int token_capacity = 16; - - // Initialize the parser - jsmn_init(&parser); - - // Parse the JSON string - while (1) { - int r = jsmn_parse(&parser, json, strlen(json), tokens, token_capacity); - if (r < 0) { - if (r == JSMN_ERROR_NOMEM) { - // Reallocate tokens array if not enough space - token_capacity *= 2; // Double the capacity - tokens = (jsmntok_t*)realloc( - tokens, token_capacity * sizeof(jsmntok_t)); - if (!tokens) { - fprintf(stderr, "Memory reallocation failed\n"); - return; - } - continue; // Try parsing again - } else { - fprintf(stderr, "Failed to parse JSON: %d\n", r); - free(tokens); - return; - } - } - num_tokens = r; - break; // Exit the loop if parsing was successful - } - - // assert(tokens[0].type == JSMN_OBJECT); - - // Iterate through the tokens - for (int i = 0; i < num_tokens; i++) { - std::cout << "i:" << i << "type: " << tokens[i].type - << "token size:" << tokens[i].size << std::endl; - printf("value: %.*s\n", - tokens[i].end - tokens[i].start, - json + tokens[i].start); - } - - // Clean up - free(tokens); -} - -TEST(CApiTest, test_jsmn_function) { - int64_t all_cost = 0; - // auto json_string = GenerateJson(50); - // std::cout << json_string << std::endl; - // extract_key_value_pairs(json_string.c_str()); - - std::string json_string = - R"({"keys0": ["value0", 234, "values1"], "keys1": ["value3", 1235]})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); - - json_string = - R"({"keys0": [{"keys1": 1234, "keys2": "xxx"}, {"keys3": 567, "keys4": "xxxxx"}]})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); - - json_string = R"({"keys0": {"keys1": { "keys2": "xxx", "keys3" :1234}}})"; - std::cout << json_string << std::endl; - extract_key_value_pairs(json_string.c_str()); -} +} \ No newline at end of file diff --git a/internal/core/unittest/test_json_key_index.cpp b/internal/core/unittest/test_json_key_index.cpp index c128c78909535..dc8089be1e042 100644 --- a/internal/core/unittest/test_json_key_index.cpp +++ b/internal/core/unittest/test_json_key_index.cpp @@ -24,82 +24,22 @@ #include "test_utils/indexbuilder_test_utils.h" #include "index/Meta.h" #include "index/JsonKeyInvertedIndex.h" - +#include "common/Json.h" +#include "common/Types.h" using namespace milvus::index; using namespace milvus::indexbuilder; using namespace milvus; using namespace milvus::index; -std::string -join(const std::vector& vec, const std::string& delimiter) { - std::ostringstream oss; - for (size_t i = 0; i < vec.size(); ++i) { - oss << vec[i]; - if (i != vec.size() - 1) { - oss << delimiter; - } - } - return oss.str(); -} - -// 1000 keys -static std::string -GenerateJson(int N) { - std::vector data(N); - std::default_random_engine er(42); - std::normal_distribution<> distr(0, 1); - std::vector keys; - for (int i = 0; i < N; i++) { - keys.push_back("keys" + std::to_string(i)); - } - std::string json_string; - std::vector values(N); - for (int i = 0; i < N; i++) { - if (i % 7 == 0) { - values[i] = std::to_string(er()); - } else if (i % 7 == 1) { - values[i] = std::to_string(static_cast(er())); - } else if (i % 7 == 2) { - values[i] = er() / 2 == 0 ? "true" : "false"; - } else if (i % 7 == 3) { - values[i] = "\"xxxx" + std::to_string(i) + "\""; - } else if (i % 7 == 4) { - std::vector intvec(10); - for (int j = 0; j < 10; j++) { - intvec[j] = std::to_string(i + j); - } - values[i] = "[" + join(intvec, ",") + "]"; - } else if (i % 7 == 5) { - std::vector doublevec(10); - for (int j = 0; j < 10; j++) { - doublevec[j] = - std::to_string(static_cast(i + j + er())); - } - values[i] = "[" + join(doublevec, ",") + "]"; - } else if (i % 7 == 6) { - std::vector stringvec(10); - for (int j = 0; j < 10; j++) { - stringvec[j] = "\"xxx" + std::to_string(j) + "\""; - } - values[i] = "[" + join(stringvec, ",") + "]"; - } - } - json_string += "{"; - for (int i = 0; i < N - 1; i++) { - json_string += R"(")" + keys[i] + R"(":)" + values[i] + R"(,)"; - } - json_string += R"(")" + keys[N - 1] + R"(":)" + values[N - 1]; - json_string += "}"; - return json_string; -} - -static std::vector -GenerateJsons(int size, int dim) { +static std::vector +GenerateJsons(int size) { std::vector jsons; - for (int i = 0; i < size; ++i) { - std::cout << GenerateJson(dim) << std::endl; - jsons.push_back( - milvus::Json(simdjson::padded_string(GenerateJson(dim)))); + for (int i = 0; i < size; i++) { + auto str = R"({"int":)" + std::to_string(random()) + R"(,"double":)" + + std::to_string(static_cast(random())) + + R"(,"string":")" + std::to_string(random()) + + R"(","bool": true)" + R"(, "array": [1,2,3])" + "}"; + jsons.push_back(milvus::Json(simdjson::padded_string(str))); } return jsons; } @@ -113,8 +53,7 @@ class JsonKeyIndexTest : public testing::Test { int64_t field_id, int64_t index_build_id, int64_t index_version, - int64_t size, - int64_t dim) { + int64_t size) { proto::schema::FieldSchema field_schema; field_schema.set_data_type(proto::schema::DataType::JSON); @@ -123,7 +62,7 @@ class JsonKeyIndexTest : public testing::Test { auto index_meta = storage::IndexMeta{ segment_id, field_id, index_build_id, index_version}; - data_ = std::move(GenerateJsons(size, dim)); + data_ = std::move(GenerateJsons(size)); auto field_data = storage::CreateFieldData(DataType::JSON); field_data->FillFieldData(data_.data(), data_.size()); storage::InsertData insert_data(field_data); @@ -177,8 +116,7 @@ class JsonKeyIndexTest : public testing::Test { int64_t field_id = 101; int64_t index_build_id = 1000; int64_t index_version = 10000; - size_ = 10; - dim_ = 10; + size_ = 10000; std::string root_path = "/tmp/test-jsonkey-index/"; storage::StorageConfig storage_config; @@ -192,8 +130,7 @@ class JsonKeyIndexTest : public testing::Test { field_id, index_build_id, index_version, - size_, - dim_); + size_); } virtual ~JsonKeyIndexTest() override { @@ -204,59 +141,42 @@ class JsonKeyIndexTest : public testing::Test { void TestTermInFunc() { { - std::vector> testcases{{"705894"}}; + struct Testcase { + std::vector term; + std::vector nested_path; + }; + std::vector testcases{ + {{1, 2, 3, 4}, {"int"}}, + {{10, 100, 1000, 10000}, {"int"}}, + {{100, 10000, 9999, 444}, {"int"}}, + {{23, 42, 66, 17, 25}, {"int"}}, + }; for (auto testcase : testcases) { - auto check = [&](std::string value) { - std::unordered_set term_set(testcase.begin(), - testcase.end()); + auto check = [&](int64_t value) { + std::unordered_set term_set(testcase.term.begin(), + testcase.term.end()); return term_set.find(value) != term_set.end(); }; - std::unordered_set term_set(testcase.begin(), - testcase.end()); + std::unordered_set term_set(testcase.term.begin(), + testcase.term.end()); auto filter_func = [&term_set, this](uint32_t row_id, uint16_t offset, uint16_t size) { - auto val = - this->data_[row_id].template at_pos( - offset, size); + auto val = this->data_[row_id].template at_pos( + offset, size); if (val.second != "") { return false; } - return term_set.find((std::string(val.first))) != + return term_set.find((int64_t(val.first))) != term_set.end(); }; - auto bitset = - index_->FilterByPath("/keys0", size_, filter_func); - + auto pointer = milvus::Json::pointer(testcase.nested_path); + auto bitset = index_->FilterByPath(pointer, size_, filter_func); ASSERT_EQ(bitset.size(), size_); for (int i = 0; i < bitset.size(); ++i) { + auto val = data_[i].template at(pointer).value(); auto ans = bitset[i]; - auto ref = check("705894"); - ASSERT_EQ(ans, ref); - } - } - } - { - std::vector testcases{"true"}; - for (auto& value : testcases) { - auto filter_func = [this, &value](uint32_t row_id, - uint16_t offset, - uint16_t size) { - auto val = - this->data_[row_id].template at_pos( - offset, size); - if (val.second != "") { - return false; - } - return std::string(val.first) == value; - }; - - auto bitset = - index_->FilterByPath("/keys2", size_, filter_func); - ASSERT_EQ(bitset.size(), size_); - for (int i = 0; i < bitset.size(); ++i) { - auto ans = bitset[i]; - auto ref = (value == "false"); + auto ref = check(val); ASSERT_EQ(ans, ref); } } @@ -264,7 +184,16 @@ class JsonKeyIndexTest : public testing::Test { } void TestUnaryRangeInFunc() { - std::vector testcases{"10", "705894", "805894"}; + struct Testcase { + int64_t val; + std::vector nested_path; + }; + std::vector testcases{ + {10, {"int"}}, + {20, {"int"}}, + {30, {"int"}}, + {40, {"int"}}, + }; std::vector ops{ OpType::Equal, OpType::NotEqual, @@ -274,40 +203,40 @@ class JsonKeyIndexTest : public testing::Test { OpType::LessEqual, }; for (const auto& testcase : testcases) { - auto check = [&](std::string value) { return value == testcase; }; - std::function f = check; + auto check = [&](int64_t value) { return value == testcase.val; }; + std::function f = check; for (auto& op : ops) { switch (op) { case OpType::Equal: { - f = [&](std::string value) { - return value == testcase; + f = [&](int64_t value) { + return value == testcase.val; }; break; } case OpType::NotEqual: { - f = [&](std::string value) { - return value != testcase; + f = [&](int64_t value) { + return value != testcase.val; }; break; } case OpType::GreaterEqual: { - f = [&](std::string value) { - return value >= testcase; + f = [&](int64_t value) { + return value >= testcase.val; }; break; } case OpType::GreaterThan: { - f = [&](std::string value) { return value > testcase; }; + f = [&](int64_t value) { return value > testcase.val; }; break; } case OpType::LessEqual: { - f = [&](std::string value) { - return value <= testcase; + f = [&](int64_t value) { + return value <= testcase.val; }; break; } case OpType::LessThan: { - f = [&](std::string value) { return value < testcase; }; + f = [&](int64_t value) { return value < testcase.val; }; break; } default: { @@ -318,35 +247,35 @@ class JsonKeyIndexTest : public testing::Test { auto filter_func = [&op, &testcase, this](uint32_t row_id, uint16_t offset, uint16_t size) { - auto val = - this->data_[row_id].template at_pos( - offset, size); + auto val = this->data_[row_id].template at_pos( + offset, size); if (val.second != "") { return false; } switch (op) { case OpType::GreaterThan: - return std::string(val.first) > testcase; + return int64_t(val.first) > testcase.val; case OpType::GreaterEqual: - return std::string(val.first) >= testcase; + return int64_t(val.first) >= testcase.val; case OpType::LessThan: - return std::string(val.first) < testcase; + return int64_t(val.first) < testcase.val; case OpType::LessEqual: - return std::string(val.first) <= testcase; + return int64_t(val.first) <= testcase.val; case OpType::Equal: - return std::string(val.first) == testcase; + return int64_t(val.first) == testcase.val; case OpType::NotEqual: - return std::string(val.first) != testcase; + return int64_t(val.first) != testcase.val; default: return false; } }; - auto bitset = - index_->FilterByPath("/keys0", size_, filter_func); + auto pointer = milvus::Json::pointer(testcase.nested_path); + auto bitset = index_->FilterByPath(pointer, size_, filter_func); ASSERT_EQ(bitset.size(), size_); for (int i = 0; i < bitset.size(); ++i) { auto ans = bitset[i]; - auto ref = f("705894"); + auto val = data_[i].template at(pointer).value(); + auto ref = f(val); ASSERT_EQ(ans, ref); } } @@ -358,17 +287,18 @@ class JsonKeyIndexTest : public testing::Test { struct Testcase { bool lower_inclusive; bool upper_inclusive; - std::string lower; - std::string upper; + int64_t lower; + int64_t upper; + std::vector nested_path; }; std::vector testcases{ - {true, false, "10", "20"}, - {true, true, "20", "30"}, - {false, true, "30", "40"}, - {false, false, "40", "50"}, + {true, false, 10, 20, {"int"}}, + {true, true, 20, 30, {"int"}}, + {false, true, 30, 40, {"int"}}, + {false, false, 40, 50, {"int"}}, }; for (const auto& testcase : testcases) { - auto check = [&](std::string value) { + auto check = [&](int64_t value) { if (testcase.lower_inclusive && testcase.upper_inclusive) { return testcase.lower <= value && value <= testcase.upper; } else if (testcase.lower_inclusive && @@ -386,43 +316,190 @@ class JsonKeyIndexTest : public testing::Test { uint16_t offset, uint16_t size) { auto val = - this->data_[row_id].template at_pos( - offset, size); + this->data_[row_id].template at_pos(offset, size); if (val.second != "") { return false; } if (testcase.lower_inclusive && testcase.upper_inclusive) { - return testcase.lower <= std::string(val.first) && - std::string(val.first) <= testcase.upper; + return testcase.lower <= int64_t(val.first) && + int64_t(val.first) <= testcase.upper; } else if (testcase.lower_inclusive && !testcase.upper_inclusive) { - return testcase.lower <= std::string(val.first) && - std::string(val.first) < testcase.upper; + return testcase.lower <= int64_t(val.first) && + int64_t(val.first) < testcase.upper; } else if (!testcase.lower_inclusive && testcase.upper_inclusive) { - return testcase.lower < std::string(val.first) && - std::string(val.first) <= testcase.upper; + return testcase.lower < int64_t(val.first) && + int64_t(val.first) <= testcase.upper; } else { - return testcase.lower < std::string(val.first) && - std::string(val.first) < testcase.upper; + return testcase.lower < int64_t(val.first) && + int64_t(val.first) < testcase.upper; } }; - auto bitset = index_->FilterByPath("/keys7", size_, filter_func); + auto pointer = milvus::Json::pointer(testcase.nested_path); + auto bitset = index_->FilterByPath(pointer, size_, filter_func); ASSERT_EQ(bitset.size(), size_); for (int i = 0; i < bitset.size(); ++i) { auto ans = bitset[i]; - auto ref = check("970724117"); + auto val = data_[i].template at(pointer).value(); + auto ref = check(val); ASSERT_EQ(ans, ref); } } } + void + TestExistInFunc() { + struct Testcase { + std::vector nested_path; + }; + std::vector testcases{ + {{"A"}}, + {{"int"}}, + {{"double"}}, + {{"B"}}, + }; + for (const auto& testcase : testcases) { + auto pointer = milvus::Json::pointer(testcase.nested_path); + auto filter_func = [&pointer, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + return this->data_[row_id].exist(pointer); + }; + + auto bitset = index_->FilterByPath(pointer, size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto val = data_[i].exist(pointer); + ASSERT_EQ(ans, val); + } + } + } + + void + TestJsonContainsAllFunc() { + { + std::vector> testcases{ + {{1, 10}, {"int"}}, + {{10, 100}, {"int"}}, + {{100, 1000}, {"int"}}, + {{1000, 10}, {"int"}}, + {{2, 4, 6, 8, 10}, {"int"}}, + {{1, 2, 3, 4, 5}, {"int"}}, + }; + for (const auto& testcase : testcases) { + auto check = [&](const std::vector& values) { + for (auto const& e : testcase.term) { + if (std::find(values.begin(), values.end(), e) == + values.end()) { + return false; + } + } + return true; + }; + auto pointer = milvus::Json::pointer(testcase.nested_path); + std::unordered_set elements; + for (auto const& element : testcase.term) { + elements.insert(element); + } + auto filter_func = [&elements, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto array = this->data_[row_id].array_at(offset, size); + std::unordered_set tmp_elements(elements); + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } + } + return tmp_elements.empty(); + }; + + auto bitset = index_->FilterByPath(pointer, size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto array = data_[i].array_at(pointer); + std::vector res; + for (const auto& element : array) { + res.push_back(element.template get()); + } + ASSERT_EQ(ans, check(res)); + } + } + } + + { + std::vector> bool_testcases{ + {{true, true}, {"bool"}}, {{false, false}, {"bool"}}}; + for (const auto& testcase : bool_testcases) { + auto check = [&](const std::vector& values) { + for (auto const& e : testcase.term) { + if (std::find(values.begin(), values.end(), e) == + values.end()) { + return false; + } + } + return true; + }; + auto pointer = milvus::Json::pointer(testcase.nested_path); + std::unordered_set elements; + for (auto const& element : testcase.term) { + elements.insert(element); + } + auto filter_func = [&elements, this](uint32_t row_id, + uint16_t offset, + uint16_t size) { + auto array = this->data_[row_id].array_at(offset, size); + std::unordered_set tmp_elements(elements); + for (auto&& it : array) { + auto val = it.template get(); + if (val.error()) { + continue; + } + tmp_elements.erase(val.value()); + if (tmp_elements.size() == 0) { + return true; + } + } + + return tmp_elements.empty(); + }; + + auto bitset = index_->FilterByPath(pointer, size_, filter_func); + ASSERT_EQ(bitset.size(), size_); + for (int i = 0; i < bitset.size(); ++i) { + auto ans = bitset[i]; + auto array = data_[i].array_at(pointer); + std::vector res; + for (const auto& element : array) { + res.push_back(element.template get()); + } + ASSERT_EQ(ans, check(res)); + } + } + } + } + + template + struct Testcase { + std::vector term; + std::vector nested_path; + bool res; + }; + public: std::shared_ptr index_; DataType type_; size_t size_; - size_t dim_; std::vector data_; + std::vector json_col; std::shared_ptr chunk_manager_; }; @@ -430,4 +507,5 @@ TEST_F(JsonKeyIndexTest, CountFuncTest) { TestTermInFunc(); TestUnaryRangeInFunc(); TestBinaryRangeInFunc(); + TestExistInFunc(); } \ No newline at end of file diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 4ffc24c6702bf..3878c1fe95e09 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -145,7 +145,15 @@ func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { if !isFlush(segment) { return false } - return true + for _, fieldID := range fieldIDs { + if segment.GetJsonKeyStats() == nil { + return true + } + if segment.GetJsonKeyStats()[fieldID] == nil { + return true + } + } + return false } func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool { diff --git a/internal/datacoord/job_manager_test.go b/internal/datacoord/job_manager_test.go index a0d95e4cd5b3d..bcba4dd77086c 100644 --- a/internal/datacoord/job_manager_test.go +++ b/internal/datacoord/job_manager_test.go @@ -62,6 +62,11 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { }, }, }, + { + FieldID: 102, + Name: "json", + DataType: schemapb.DataType_JSON, + }, }, }, }, @@ -117,5 +122,5 @@ func (s *jobManagerSuite) TestJobManager_triggerStatsTaskLoop() { jm.loopWg.Wait() - s.Equal(2, len(jm.scheduler.tasks)) + s.Equal(4, len(jm.scheduler.tasks)) } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 2948c87829f50..53aee5afdb765 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -516,7 +516,7 @@ func (i *IndexNode) QueryJobsV2(ctx context.Context, req *workerpb.QueryJobsV2Re InsertLogs: info.insertLogs, StatsLogs: info.statsLogs, TextStatsLogs: info.textStatsLogs, - Bm25Logs: info.bm25Logs, + Bm25Logs: info.bm25Logs, NumRows: info.numRows, JsonKeyStatsLogs: info.jsonKeyStatsLogs, }) diff --git a/internal/indexnode/task_stats.go b/internal/indexnode/task_stats.go index 787fc32d6daa9..40f2dc93a1d4e 100644 --- a/internal/indexnode/task_stats.go +++ b/internal/indexnode/task_stats.go @@ -333,7 +333,7 @@ func (st *statsTask) Execute(ctx context.Context) error { return err } } else if st.req.GetSubJobType() == indexpb.StatsSubJob_JsonKeyIndexJob { - err = st.createJsonKeyIndex(ctx, + err = st.createJSONKeyIndex(ctx, st.req.GetStorageConfig(), st.req.GetCollectionID(), st.req.GetPartitionID(), @@ -729,7 +729,7 @@ func (st *statsTask) createTextIndex(ctx context.Context, return nil } -func (st *statsTask) createJsonKeyIndex(ctx context.Context, +func (st *statsTask) createJSONKeyIndex(ctx context.Context, storageConfig *indexpb.StorageConfig, collectionID int64, partitionID int64, @@ -773,7 +773,7 @@ func (st *statsTask) createJsonKeyIndex(ctx context.Context, jsonKeyIndexStats := make(map[int64]*datapb.JsonKeyStats) for _, field := range st.req.GetSchema().GetFields() { h := typeutil.CreateFieldSchemaHelper(field) - if !h.EnableJsonKeyIndex() { + if !h.EnableJSONKeyIndex() { continue } log.Info("field enable json key index, ready to create json key index", zap.Int64("field id", field.GetFieldID())) @@ -794,7 +794,7 @@ func (st *statsTask) createJsonKeyIndex(ctx context.Context, StorageConfig: newStorageConfig, } - uploaded, err := indexcgowrapper.CreateJsonKeyIndex(ctx, buildIndexParams) + uploaded, err := indexcgowrapper.CreateJSONKeyIndex(ctx, buildIndexParams) if err != nil { return err } @@ -812,7 +812,7 @@ func (st *statsTask) createJsonKeyIndex(ctx context.Context, totalElapse := st.tr.RecordSpan() - st.node.storeJsonKeyIndexResult(st.req.GetClusterID(), + st.node.storeJSONKeyIndexResult(st.req.GetClusterID(), st.req.GetTaskID(), st.req.GetCollectionID(), st.req.GetPartitionID(), diff --git a/internal/indexnode/taskinfo_ops.go b/internal/indexnode/taskinfo_ops.go index f15050af4eb68..41538b0f74953 100644 --- a/internal/indexnode/taskinfo_ops.go +++ b/internal/indexnode/taskinfo_ops.go @@ -323,7 +323,7 @@ type statsTaskInfo struct { insertLogs []*datapb.FieldBinlog statsLogs []*datapb.FieldBinlog textStatsLogs map[int64]*datapb.TextIndexStats - bm25Logs []*datapb.FieldBinlog + bm25Logs []*datapb.FieldBinlog jsonKeyStatsLogs map[int64]*datapb.JsonKeyStats } @@ -411,14 +411,15 @@ func (i *IndexNode) storeStatsTextIndexResult( } } -func (i *IndexNode) storeJsonKeyIndexResult( +func (i *IndexNode) storeJSONKeyIndexResult( clusterID string, taskID UniqueID, collID UniqueID, partID UniqueID, segID UniqueID, channel string, - jsonKeyIndexLogs map[int64]*datapb.JsonKeyStats) { + jsonKeyIndexLogs map[int64]*datapb.JsonKeyStats, +) { key := taskKey{ClusterID: clusterID, TaskID: taskID} i.stateLock.Lock() defer i.stateLock.Unlock() @@ -448,7 +449,7 @@ func (i *IndexNode) getStatsTaskInfo(clusterID string, taskID UniqueID) *statsTa insertLogs: info.insertLogs, statsLogs: info.statsLogs, textStatsLogs: info.textStatsLogs, - bm25Logs: info.bm25Logs, + bm25Logs: info.bm25Logs, jsonKeyStatsLogs: info.jsonKeyStatsLogs, } } diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 83982f8e8c3b4..fe5d499be837d 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -396,6 +396,7 @@ enum LoadScope { Full = 0; Delta = 1; Index = 2; + Stats = 3; } message LoadSegmentsRequest { @@ -645,6 +646,7 @@ message SegmentVersionInfo { map index_info = 7; data.SegmentLevel level = 8; bool is_sorted = 9; + repeated int64 field_json_index_stats = 10; } message ChannelVersionInfo { diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 2cc46e5f1f11b..1baa0acbffe2a 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -72,6 +72,7 @@ func NewCheckerController( // todo temporary work around must fix // utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true), utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr), + utils.StatsChecker: NewStatsChecker(meta, dist, broker, nodeMgr, targetMgr), } manualCheckChs := map[utils.CheckerType]chan struct{}{ @@ -112,6 +113,8 @@ func getCheckerInterval(checker utils.CheckerType) time.Duration { return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) case utils.LeaderChecker: return Params.QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second) + case utils.StatsChecker: + return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) default: return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond) } diff --git a/internal/querycoordv2/checkers/controller_base_test.go b/internal/querycoordv2/checkers/controller_base_test.go index 0d8e301492b51..e66123b2aa38a 100644 --- a/internal/querycoordv2/checkers/controller_base_test.go +++ b/internal/querycoordv2/checkers/controller_base_test.go @@ -103,7 +103,7 @@ func (s *ControllerBaseTestSuite) TestActivation() { func (s *ControllerBaseTestSuite) TestListCheckers() { checkers := s.controller.Checkers() - s.Equal(5, len(checkers)) + s.Equal(6, len(checkers)) } func TestControllerBaseTestSuite(t *testing.T) { diff --git a/internal/querycoordv2/checkers/stats_checker.go b/internal/querycoordv2/checkers/stats_checker.go new file mode 100644 index 0000000000000..c580fc158662d --- /dev/null +++ b/internal/querycoordv2/checkers/stats_checker.go @@ -0,0 +1,193 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ Checker = (*StatsChecker)(nil) + +// StatsChecker perform segment stats index check. +type StatsChecker struct { + *checkerActivation + meta *meta.Meta + dist *meta.DistributionManager + broker meta.Broker + nodeMgr *session.NodeManager + + targetMgr meta.TargetManagerInterface +} + +func NewStatsChecker( + meta *meta.Meta, + dist *meta.DistributionManager, + broker meta.Broker, + nodeMgr *session.NodeManager, + targetMgr meta.TargetManagerInterface, +) *StatsChecker { + return &StatsChecker{ + checkerActivation: newCheckerActivation(), + meta: meta, + dist: dist, + broker: broker, + nodeMgr: nodeMgr, + targetMgr: targetMgr, + } +} + +func (c *StatsChecker) ID() utils.CheckerType { + return utils.StatsChecker +} + +func (c *StatsChecker) Description() string { + return "StatsChecker checks stats state change of segments and generates load stats task" +} + +func (c *StatsChecker) Check(ctx context.Context) []task.Task { + if !c.IsActive() { + return nil + } + collectionIDs := c.meta.CollectionManager.GetAll(ctx) + log.Info("StatsChecker", zap.Any("collectionIDs", collectionIDs)) + var tasks []task.Task + + for _, collectionID := range collectionIDs { + resp, err := c.broker.DescribeCollection(ctx, collectionID) + if err != nil { + log.Warn("describeCollection during check stats", zap.Int64("collection", collectionID)) + continue + } + collection := c.meta.CollectionManager.GetCollection(ctx, collectionID) + if collection == nil { + log.Warn("collection released during check stats", zap.Int64("collection", collectionID)) + continue + } + replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID) + for _, replica := range replicas { + tasks = append(tasks, c.checkReplica(ctx, collection, replica, resp)...) + } + } + + return tasks +} + +func (c *StatsChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica, resp *milvuspb.DescribeCollectionResponse) []task.Task { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collection.GetCollectionID()), + ) + var tasks []task.Task + segments := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica)) + idSegments := make(map[int64]*meta.Segment) + roNodeSet := typeutil.NewUniqueSet(replica.GetRONodes()...) + targets := make(map[int64][]int64) // segmentID => FieldID + for _, segment := range segments { + // skip update index in read only node + if roNodeSet.Contain(segment.Node) { + continue + } + + // skip update index for l0 segment + segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst) + if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 { + continue + } + missing := c.checkSegment(segment, resp) + if len(missing) > 0 { + targets[segment.GetID()] = missing + idSegments[segment.GetID()] = segment + } + } + + segmentsToUpdate := typeutil.NewSet[int64]() + for _, segmentIDs := range lo.Chunk(lo.Keys(targets), MaxSegmentNumPerGetIndexInfoRPC) { + for _, segmentID := range segmentIDs { + segmentsToUpdate.Insert(segmentID) + } + } + + log.Info("StatsChecker checkReplica", zap.Any("segmentsToUpdate", segmentsToUpdate)) + + tasks = lo.FilterMap(segmentsToUpdate.Collect(), func(segmentID int64, _ int) (task.Task, bool) { + return c.createSegmentUpdateTask(ctx, idSegments[segmentID], replica) + }) + + return tasks +} + +func (c *StatsChecker) checkSegment(segment *meta.Segment, resp *milvuspb.DescribeCollectionResponse) (missFieldIDs []int64) { + var result []int64 + for _, field := range resp.GetSchema().GetFields() { + h := typeutil.CreateFieldSchemaHelper(field) + if h.EnableJSONKeyIndex() { + log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("JsonIndexField", segment.JSONIndexField)) + exists := false + for i := 0; i < len(segment.JSONIndexField); i++ { + if segment.JSONIndexField[i] == field.FieldID { + exists = true + break + } + } + + if !exists { + result = append(result, field.FieldID) + continue + } + } + } + log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("segment", segment.GetJsonKeyStats())) + return result +} + +func (c *StatsChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) { + action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeStatsUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical) + t, err := task.NewSegmentTask( + ctx, + params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + segment.GetCollectionID(), + replica, + action, + ) + if err != nil { + log.Warn("create segment stats update task failed", + zap.Int64("collection", segment.GetCollectionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Int64("node", segment.Node), + zap.Error(err), + ) + return nil, false + } + t.SetPriority(task.TaskPriorityLow) + t.SetReason("missing json stats") + return t, true +} diff --git a/internal/querycoordv2/checkers/stats_checker_test.go b/internal/querycoordv2/checkers/stats_checker_test.go new file mode 100644 index 0000000000000..db2500fedb8bf --- /dev/null +++ b/internal/querycoordv2/checkers/stats_checker_test.go @@ -0,0 +1,280 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/kv" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type StatsCheckerSuite struct { + suite.Suite + kv kv.MetaKv + checker *StatsChecker + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager + targetMgr *meta.MockTargetManager +} + +func (suite *StatsCheckerSuite) SetupSuite() { + paramtable.Init() +} + +func (suite *StatsCheckerSuite) SetupTest() { + var err error + config := params.GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := querycoord.NewCatalog(suite.kv) + idAllocator := params.RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + distManager := meta.NewDistributionManager() + suite.broker = meta.NewMockBroker(suite.T()) + + suite.targetMgr = meta.NewMockTargetManager(suite.T()) + suite.checker = NewStatsChecker(suite.meta, distManager, suite.broker, suite.nodeMgr, suite.targetMgr) + + suite.targetMgr.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, cid, sid int64, i3 int32) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: sid, + Level: datapb.SegmentLevel_L1, + } + }).Maybe() +} + +func (suite *StatsCheckerSuite) TearDownTest() { + suite.kv.Close() +} + +func (suite *StatsCheckerSuite) TestLoadJsonIndex() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_JSON, Name: "JSON"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 1) + + t := tasks[0] + suite.Require().Len(t.Actions(), 1) + + action, ok := t.Actions()[0].(*task.SegmentAction) + suite.Require().True(ok) + suite.EqualValues(200, t.ReplicaID()) + suite.Equal(task.ActionTypeStatsUpdate, action.Type()) + suite.EqualValues(2, action.GetSegmentID()) + + // test skip load json index for read only node + suite.nodeMgr.Stopping(1) + suite.nodeMgr.Stopping(2) + suite.meta.ResourceManager.HandleNodeStopping(ctx, 1) + suite.meta.ResourceManager.HandleNodeStopping(ctx, 2) + utils.RecoverAllCollection(suite.meta) + tasks = checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestJsonIndexNotMatch() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_Int16, Name: "int"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestDescribeCollectionFailed() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(nil, errors.New("mocked error")) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *StatsCheckerSuite) TestCreateNewJsonIndex() { + checker := suite.checker + ctx := context.Background() + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(ctx, coll) + checker.meta.ReplicaManager.Put(ctx, utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 1, + Address: "localhost", + Hostname: "localhost", + })) + suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: 2, + Address: "localhost", + Hostname: "localhost", + })) + checker.meta.ResourceManager.HandleNodeUp(ctx, 1) + checker.meta.ResourceManager.HandleNodeUp(ctx, 2) + + // dist + segment := utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel") + segment.JSONIndexField = []int64{102} + checker.dist.SegmentDistManager.Update(1, segment) + + // broker + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)). + Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Name: "test_loadJsonIndex", + Fields: []*schemapb.FieldSchema{ + {FieldID: 101, DataType: schemapb.DataType_JSON, Name: "JSON"}, + }, + }, + CollectionID: 1, + CollectionName: "test_loadJsonIndex", + }, nil) + + tasks := checker.Check(context.Background()) + suite.Len(tasks, 1) + suite.Len(tasks[0].Actions(), 1) + suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).Type(), task.ActionTypeStatsUpdate) +} + +func TestStatsChecker(t *testing.T) { + suite.Run(t, new(StatsCheckerSuite)) +} diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 828cedc6e5ce3..caec69800bc75 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -160,6 +160,7 @@ func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *que Version: s.GetVersion(), LastDeltaTimestamp: s.GetLastDeltaTimestamp(), IndexInfo: s.GetIndexInfo(), + JSONIndexField: s.GetFieldJsonIndexStats(), }) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 85519b7770360..e8de376db9299 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -125,6 +125,7 @@ type Segment struct { Version int64 // Version is the timestamp of loading segment LastDeltaTimestamp uint64 // The timestamp of the last delta record IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment + JSONIndexField []int64 // json index info of loaded segment } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index db56c8ded85d4..ce263ef10ed23 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -184,7 +184,7 @@ func (suite *OpsServiceSuite) TestActiveCheckers() { resp, err = suite.server.ListCheckers(ctx, &querypb.ListCheckersRequest{}) suite.NoError(err) suite.True(merr.Ok(resp.Status)) - suite.Len(resp.GetCheckerInfos(), 5) + suite.Len(resp.GetCheckerInfos(), 6) resp4, err := suite.server.DeactivateChecker(ctx, &querypb.DeactivateCheckerRequest{ CheckerID: int32(utils.ChannelChecker), diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index dfbc4c44ddf52..1ab2be6200487 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -34,12 +34,14 @@ const ( ActionTypeGrow ActionType = iota + 1 ActionTypeReduce ActionTypeUpdate + ActionTypeStatsUpdate ) var ActionTypeName = map[ActionType]string{ - ActionTypeGrow: "Grow", - ActionTypeReduce: "Reduce", - ActionTypeUpdate: "Update", + ActionTypeGrow: "Grow", + ActionTypeReduce: "Reduce", + ActionTypeUpdate: "Update", + ActionTypeStatsUpdate: "StatsUpdate", } func (t ActionType) String() string { @@ -154,7 +156,7 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool return true } return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { + } else if action.Type() == ActionTypeUpdate || action.Type() == ActionTypeStatsUpdate { return action.rpcReturned.Load() } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index b66d26b0f722e..7ff26e8b6f99b 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -156,7 +156,7 @@ func (ex *Executor) removeTask(task Task, step int) { func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { switch task.Actions()[step].Type() { - case ActionTypeGrow, ActionTypeUpdate: + case ActionTypeGrow, ActionTypeUpdate, ActionTypeStatsUpdate: ex.loadSegment(task, step) case ActionTypeReduce: @@ -469,6 +469,9 @@ func (ex *Executor) executeLeaderAction(task *LeaderTask, step int) { case ActionTypeUpdate: ex.updatePartStatsVersions(task, step) + + case ActionTypeStatsUpdate: + ex.updatePartStatsVersions(task, step) } } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index d34a4eeaeb848..c6d4acf712923 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -47,13 +47,15 @@ const ( TaskTypeReduce TaskTypeMove TaskTypeUpdate + TaskTypeStatsUpdate ) var TaskTypeName = map[Type]string{ - TaskTypeGrow: "Grow", - TaskTypeReduce: "Reduce", - TaskTypeMove: "Move", - TaskTypeUpdate: "Update", + TaskTypeGrow: "Grow", + TaskTypeReduce: "Reduce", + TaskTypeMove: "Move", + TaskTypeUpdate: "Update", + TaskTypeStatsUpdate: "StatsUpdate", } type Type int32 diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index c4f4df26e5332..400d945073cb3 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -95,6 +95,8 @@ func GetTaskType(task Task) Type { return TaskTypeReduce case task.Actions()[0].Type() == ActionTypeUpdate: return TaskTypeUpdate + case task.Actions()[0].Type() == ActionTypeStatsUpdate: + return TaskTypeStatsUpdate } return 0 } @@ -132,6 +134,10 @@ func packLoadSegmentRequest( loadScope = querypb.LoadScope_Index } + if action.Type() == ActionTypeStatsUpdate { + loadScope = querypb.LoadScope_Stats + } + if task.Source() == utils.LeaderChecker { loadScope = querypb.LoadScope_Delta } diff --git a/internal/querycoordv2/utils/checker.go b/internal/querycoordv2/utils/checker.go index 0234ff2e98d8a..b201837d26e6c 100644 --- a/internal/querycoordv2/utils/checker.go +++ b/internal/querycoordv2/utils/checker.go @@ -28,6 +28,7 @@ const ( IndexCheckerName = "index_checker" LeaderCheckerName = "leader_checker" ManualBalanceName = "manual_balance" + StatsCheckerName = "stats_checker" ) type CheckerType int32 @@ -39,6 +40,7 @@ const ( IndexChecker LeaderChecker ManualBalance + StatsChecker ) var checkerNames = map[CheckerType]string{ @@ -48,6 +50,7 @@ var checkerNames = map[CheckerType]string{ IndexChecker: IndexCheckerName, LeaderChecker: LeaderCheckerName, ManualBalance: ManualBalanceName, + StatsChecker: StatsCheckerName, } func (s CheckerType) String() string { diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 9c3cc113de236..24d21a8063777 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -186,6 +186,45 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR return status } +func (node *QueryNode) loadStats(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })), + ) + + status := merr.Success() + log.Info("start to load stats") + + for _, info := range req.GetInfos() { + log := log.With(zap.Int64("segmentID", info.GetSegmentID())) + segment := node.manager.Segment.GetSealed(info.GetSegmentID()) + if segment == nil { + log.Warn("segment not found for load stats operation") + continue + } + localSegment, ok := segment.(*segments.LocalSegment) + if !ok { + log.Warn("segment not local for load stats opeartion") + continue + } + + if localSegment.IsLazyLoad() { + localSegment.SetLoadInfo(info) + localSegment.SetNeedUpdatedVersion(req.GetVersion()) + node.manager.DiskCache.MarkItemNeedReload(ctx, localSegment.ID()) + return nil + } + err := node.loader.LoadJSONIndex(ctx, localSegment, info) + if err != nil { + log.Warn("failed to load stats", zap.Error(err)) + status = merr.Status(err) + break + } + } + + return status +} + func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryRequest, channel string) (*internalpb.RetrieveResults, error) { msgID := req.Req.Base.GetMsgID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() diff --git a/internal/querynodev2/segments/mock_loader.go b/internal/querynodev2/segments/mock_loader.go index d50d52b9c077a..34e7b9a5768db 100644 --- a/internal/querynodev2/segments/mock_loader.go +++ b/internal/querynodev2/segments/mock_loader.go @@ -355,6 +355,54 @@ func (_c *MockLoader_LoadIndex_Call) RunAndReturn(run func(context.Context, Segm return _c } +// LoadJSONIndex provides a mock function with given fields: ctx, segment, info +func (_m *MockLoader) LoadJSONIndex(ctx context.Context, segment Segment, info *querypb.SegmentLoadInfo) error { + ret := _m.Called(ctx, segment, info) + + if len(ret) == 0 { + panic("no return value specified for LoadJSONIndex") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, Segment, *querypb.SegmentLoadInfo) error); ok { + r0 = rf(ctx, segment, info) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockLoader_LoadJSONIndex_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LoadJSONIndex' +type MockLoader_LoadJSONIndex_Call struct { + *mock.Call +} + +// LoadJSONIndex is a helper method to define mock.On call +// - ctx context.Context +// - segment Segment +// - info *querypb.SegmentLoadInfo +func (_e *MockLoader_Expecter) LoadJSONIndex(ctx interface{}, segment interface{}, info interface{}) *MockLoader_LoadJSONIndex_Call { + return &MockLoader_LoadJSONIndex_Call{Call: _e.mock.On("LoadJSONIndex", ctx, segment, info)} +} + +func (_c *MockLoader_LoadJSONIndex_Call) Run(run func(ctx context.Context, segment Segment, info *querypb.SegmentLoadInfo)) *MockLoader_LoadJSONIndex_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(Segment), args[2].(*querypb.SegmentLoadInfo)) + }) + return _c +} + +func (_c *MockLoader_LoadJSONIndex_Call) Return(_a0 error) *MockLoader_LoadJSONIndex_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockLoader_LoadJSONIndex_Call) RunAndReturn(run func(context.Context, Segment, *querypb.SegmentLoadInfo) error) *MockLoader_LoadJSONIndex_Call { + _c.Call.Return(run) + return _c +} + // LoadLazySegment provides a mock function with given fields: ctx, segment, loadInfo func (_m *MockLoader) LoadLazySegment(ctx context.Context, segment Segment, loadInfo *querypb.SegmentLoadInfo) error { ret := _m.Called(ctx, segment, loadInfo) diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 1af3012ed3038..9cb06d475e937 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -363,6 +363,53 @@ func (_c *MockSegment_GetBM25Stats_Call) RunAndReturn(run func() map[int64]*stor return _c } +// GetFieldJSONIndexStats provides a mock function with given fields: +func (_m *MockSegment) GetFieldJSONIndexStats() []int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetFieldJSONIndexStats") + } + + var r0 []int64 + if rf, ok := ret.Get(0).(func() []int64); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockSegment_GetFieldJSONIndexStats_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetFieldJSONIndexStats' +type MockSegment_GetFieldJSONIndexStats_Call struct { + *mock.Call +} + +// GetFieldJSONIndexStats is a helper method to define mock.On call +func (_e *MockSegment_Expecter) GetFieldJSONIndexStats() *MockSegment_GetFieldJSONIndexStats_Call { + return &MockSegment_GetFieldJSONIndexStats_Call{Call: _e.mock.On("GetFieldJSONIndexStats")} +} + +func (_c *MockSegment_GetFieldJSONIndexStats_Call) Run(run func()) *MockSegment_GetFieldJSONIndexStats_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegment_GetFieldJSONIndexStats_Call) Return(_a0 []int64) *MockSegment_GetFieldJSONIndexStats_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_GetFieldJSONIndexStats_Call) RunAndReturn(run func() []int64) *MockSegment_GetFieldJSONIndexStats_Call { + _c.Call.Return(run) + return _c +} + // GetIndex provides a mock function with given fields: fieldID func (_m *MockSegment) GetIndex(fieldID int64) *IndexedFieldInfo { ret := _m.Called(fieldID) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 7a04b7c5e13ea..077aefa3aa309 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -279,6 +279,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + fieldJSONStats []int64 } func NewSegment(ctx context.Context, @@ -1089,9 +1090,19 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI return HandleCStatus(ctx, &status, "LoadTextIndex failed") } -func (s *LocalSegment) LoadJsonKeyIndex(ctx context.Context, jsonKeyStats *datapb.JsonKeyStats, schemaHelper *typeutil.SchemaHelper) error { +func (s *LocalSegment) LoadJSONKeyIndex(ctx context.Context, jsonKeyStats *datapb.JsonKeyStats, schemaHelper *typeutil.SchemaHelper) error { log.Ctx(ctx).Info("load json key index", zap.Int64("field id", jsonKeyStats.GetFieldID()), zap.Any("json key logs", jsonKeyStats)) - + exists := false + for _, field := range s.fieldJSONStats { + if field == jsonKeyStats.GetFieldID() { + exists = true + break + } + } + if exists { + log.Warn("JsonKeyIndexStats already loaded") + return nil + } f, err := schemaHelper.GetFieldFromID(jsonKeyStats.GetFieldID()) if err != nil { return err @@ -1118,9 +1129,8 @@ func (s *LocalSegment) LoadJsonKeyIndex(ctx context.Context, jsonKeyStats *datap status = C.LoadJsonKeyIndex(traceCtx.ctx, s.ptr, (*C.uint8_t)(unsafe.Pointer(&marshaled[0])), (C.uint64_t)(len(marshaled))) return nil, nil }).Await() - + s.fieldJSONStats = append(s.fieldJSONStats, jsonKeyStats.GetFieldID()) return HandleCStatus(ctx, &status, "Load JsonKeyStats failed") - } func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { @@ -1380,3 +1390,7 @@ func (s *LocalSegment) indexNeedLoadRawData(schema *schemapb.CollectionSchema, i } return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil } + +func (s *LocalSegment) GetFieldJSONIndexStats() []int64 { + return s.fieldJSONStats +} diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 400886ccd5edf..950fe4034e559 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -102,4 +102,6 @@ type Segment interface { // lazy load related NeedUpdatedVersion() int64 RemoveUnusedFieldFiles() error + + GetFieldJSONIndexStats() []int64 } diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index cab1f64b7645a..f7b16af500b86 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -188,3 +188,7 @@ func (s *L0Segment) Release(ctx context.Context, opts ...releaseOption) { func (s *L0Segment) RemoveUnusedFieldFiles() error { panic("not implemented") } + +func (s *L0Segment) GetFieldJSONIndexStats() []int64 { + return nil +} diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index f0d51c9b611ad..1c2f98c1daa6f 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -93,6 +93,10 @@ type Loader interface { segment Segment, loadInfo *querypb.SegmentLoadInfo, ) error + + LoadJSONIndex(ctx context.Context, + segment Segment, + info *querypb.SegmentLoadInfo) error } type ResourceEstimate struct { @@ -821,11 +825,11 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu } for _, info := range jsonKeyStats { - if err := segment.LoadJsonKeyIndex(ctx, info, schemaHelper); err != nil { + if err := segment.LoadJSONKeyIndex(ctx, info, schemaHelper); err != nil { return err } } - loadJsonKeyIndexesSpan := tr.RecordSpan() + loadJSONKeyIndexesSpan := tr.RecordSpan() // 4. rectify entries number for binlog in very rare cases // https://github.com/milvus-io/milvus/23654 @@ -840,7 +844,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu zap.Duration("loadRawDataSpan", loadRawDataSpan), zap.Duration("patchEntryNumberSpan", patchEntryNumberSpan), zap.Duration("loadTextIndexesSpan", loadTextIndexesSpan), - zap.Duration("loadJsonKeyIndexSpan", loadJsonKeyIndexesSpan), + zap.Duration("loadJsonKeyIndexSpan", loadJSONKeyIndexesSpan), ) return nil } @@ -1684,6 +1688,35 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version) } +func (loader *segmentLoader) LoadJSONIndex(ctx context.Context, + seg Segment, + loadInfo *querypb.SegmentLoadInfo, +) error { + segment, ok := seg.(*LocalSegment) + if !ok { + return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg)) + } + + collection := segment.GetCollection() + schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) + + jsonKeyIndexInfo := make(map[int64]*datapb.JsonKeyStats, len(loadInfo.GetJsonKeyStatsLogs())) + for _, fieldStatsLog := range loadInfo.GetJsonKeyStatsLogs() { + jsonKeyLog, ok := jsonKeyIndexInfo[fieldStatsLog.FieldID] + if !ok { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } else if fieldStatsLog.GetVersion() > jsonKeyLog.GetVersion() { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } + } + for _, info := range jsonKeyIndexInfo { + if err := segment.LoadJSONKeyIndex(ctx, info, schemaHelper); err != nil { + return err + } + } + return nil +} + func getBinlogDataDiskSize(fieldBinlog *datapb.FieldBinlog) int64 { fieldSize := int64(0) for _, binlog := range fieldBinlog.Binlogs { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1eca7efe950cd..6d0e19c1f3501 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -484,6 +484,9 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen if req.GetLoadScope() == querypb.LoadScope_Index { return node.loadIndex(ctx, req), nil } + if req.GetLoadScope() == querypb.LoadScope_Stats { + return node.loadStats(ctx, req), nil + } // Actual load segment log.Info("start to load segments...") @@ -1160,6 +1163,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed)) segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments)) for _, s := range sealedSegments { + log.Info("GetDataDistribution", zap.Any("JsonKeyStatsLogs", s.LoadInfo().GetJsonKeyStatsLogs()), zap.Any("Indexes", s.Indexes())) segmentVersionInfos = append(segmentVersionInfos, &querypb.SegmentVersionInfo{ ID: s.ID(), Collection: s.Collection(), @@ -1172,6 +1176,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { return info.IndexInfo.FieldID, info.IndexInfo }), + FieldJsonIndexStats: s.GetFieldJSONIndexStats(), }) } diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index 255fc99d43e4a..a813be05a5cf3 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -163,7 +163,7 @@ func CreateTextIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexI return res, nil } -func CreateJsonKeyIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (map[string]int64, error) { +func CreateJSONKeyIndex(ctx context.Context, buildIndexInfo *indexcgopb.BuildIndexInfo) (map[string]int64, error) { buildIndexInfoBlob, err := proto.Marshal(buildIndexInfo) if err != nil { log.Ctx(ctx).Warn("marshal buildIndexInfo failed", diff --git a/pkg/util/typeutil/field_schema.go b/pkg/util/typeutil/field_schema.go index 460b9adf25efc..4b14fd40078de 100644 --- a/pkg/util/typeutil/field_schema.go +++ b/pkg/util/typeutil/field_schema.go @@ -53,11 +53,8 @@ func (h *FieldSchemaHelper) EnableMatch() bool { return err == nil && enable } -func (h *FieldSchemaHelper) EnableJsonKeyIndex() bool { - if IsJSONType(h.schema.GetDataType()) { - return true - } - return false +func (h *FieldSchemaHelper) EnableJSONKeyIndex() bool { + return IsJSONType(h.schema.GetDataType()) } func (h *FieldSchemaHelper) EnableAnalyzer() bool {