diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index c8e10347db8f4..8c76f5cb2312a 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -77,3 +77,6 @@ const int64_t DEFAULT_BITMAP_INDEX_BUILD_MODE_BOUND = 500; const int64_t DEFAULT_HYBRID_INDEX_BITMAP_CARDINALITY_LIMIT = 100; const size_t MARISA_NULL_KEY_ID = -1; + +const std::string JSON_CAST_TYPE = "json_cast_type"; +const std::string JSON_PATH = "json_path"; \ No newline at end of file diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index 5703acce16106..f48823ecfd55f 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -641,6 +641,20 @@ class FieldDataJsonImpl : public FieldDataImpl { } length_ += n; } + + // only for test + void + add_json_data(const std::vector& json) { + std::lock_guard lck(tell_mutex_); + if (length_ + json.size() > get_num_rows()) { + resize_field_data(length_ + json.size()); + } + + for (size_t i = 0; i < json.size(); ++i) { + data_[length_ + i] = json[i]; + } + length_ += json.size(); + } }; class FieldDataSparseVectorImpl diff --git a/internal/core/src/common/FieldMeta.h b/internal/core/src/common/FieldMeta.h index 92356b15724c2..55b2afd6934d7 100644 --- a/internal/core/src/common/FieldMeta.h +++ b/internal/core/src/common/FieldMeta.h @@ -162,6 +162,11 @@ class FieldMeta { return IsVectorDataType(type_); } + bool + is_json() const { + return type_ == DataType::JSON; + } + bool is_string() const { return IsStringDataType(type_); diff --git a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h index 0ffb83a936cbf..7be53523f94e1 100644 --- a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h @@ -447,6 +447,7 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.h b/internal/core/src/exec/expression/BinaryRangeExpr.h index 1babfc6fd044e..0ca4fac9e2805 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryRangeExpr.h @@ -229,6 +229,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/ExistsExpr.h b/internal/core/src/exec/expression/ExistsExpr.h index dc00f883c7400..020a23e1d9b81 100644 --- a/internal/core/src/exec/expression/ExistsExpr.h +++ b/internal/core/src/exec/expression/ExistsExpr.h @@ -47,6 +47,7 @@ class PhyExistsFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index f2bbc8cd7f6bb..37816e942a89e 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -20,12 +20,15 @@ #include #include +#include "common/FieldDataInterface.h" +#include "common/Json.h" #include "common/Types.h" #include "exec/expression/EvalCtx.h" #include "exec/expression/VectorFunction.h" #include "exec/expression/Utils.h" #include "exec/QueryContext.h" #include "expr/ITypeExpr.h" +#include "log/Log.h" #include "query/PlanProto.h" namespace milvus { @@ -109,12 +112,15 @@ class SegmentExpr : public Expr { SegmentExpr(const std::vector&& input, const std::string& name, const segcore::SegmentInternalInterface* segment, - const FieldId& field_id, + const FieldId field_id, + const std::vector nested_path, int64_t active_count, int64_t batch_size) : Expr(DataType::BOOL, std::move(input), name), segment_(segment), field_id_(field_id), + nested_path_(nested_path), + active_count_(active_count), batch_size_(batch_size) { size_per_chunk_ = segment_->size_per_chunk(); @@ -129,6 +135,7 @@ class SegmentExpr : public Expr { InitSegmentExpr() { auto& schema = segment_->get_schema(); auto& field_meta = schema[field_id_]; + field_type_ = field_meta.get_data_type(); if (schema.get_primary_field_id().has_value() && schema.get_primary_field_id().value() == field_id_ && @@ -137,9 +144,16 @@ class SegmentExpr : public Expr { pk_type_ = field_meta.get_data_type(); } - is_index_mode_ = segment_->HasIndex(field_id_); - if (is_index_mode_) { - num_index_chunk_ = segment_->num_chunk_index(field_id_); + if (field_meta.get_data_type() == DataType::JSON) { + auto pointer = milvus::Json::pointer(nested_path_); + if (is_index_mode_ = segment_->HasIndex(field_id_, pointer)) { + num_index_chunk_ = 1; + } + } else { + is_index_mode_ = segment_->HasIndex(field_id_); + if (is_index_mode_) { + num_index_chunk_ = segment_->num_chunk_index(field_id_); + } } // if index not include raw data, also need load data if (segment_->HasFieldData(field_id_)) { @@ -767,9 +781,21 @@ class SegmentExpr : public Expr { // It avoids indexing execute for every batch because indexing // executing costs quite much time. if (cached_index_chunk_id_ != i) { - const Index& index = - segment_->chunk_scalar_index(field_id_, i); - auto* index_ptr = const_cast(&index); + Index* index_ptr = nullptr; + + if (field_type_ == DataType::JSON) { + auto pointer = milvus::Json::pointer(nested_path_); + + const Index& index = + segment_->chunk_scalar_index( + field_id_, pointer, i); + index_ptr = const_cast(&index); + } else { + const Index& index = + segment_->chunk_scalar_index(field_id_, + i); + index_ptr = const_cast(&index); + } cached_index_chunk_res_ = std::move(func(index_ptr, values...)); auto valid_result = index_ptr->IsNotNull(); cached_index_chunk_valid_res_ = std::move(valid_result); @@ -1067,6 +1093,9 @@ class SegmentExpr : public Expr { DataType pk_type_; int64_t batch_size_; + std::vector nested_path_; + DataType field_type_; + bool is_index_mode_{false}; bool is_data_mode_{false}; // sometimes need to skip index and using raw data diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index a0c8848cba188..b3b0b1566085b 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -40,6 +40,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index 19f03b131b9c3..b5660374b1c2f 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -61,6 +61,7 @@ class PhyTermFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr), diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index 725af0854049a..fd392ba5d54db 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -17,6 +17,9 @@ #include "UnaryExpr.h" #include #include "common/Json.h" +#include "common/Types.h" +#include "common/type_c.h" +#include "log/Log.h" namespace milvus { namespace exec { @@ -191,26 +194,50 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } case DataType::JSON: { auto val_type = expr_->val_.val_case(); - switch (val_type) { - case proto::plan::GenericValue::ValCase::kBoolVal: - result = ExecRangeVisitorImplJson(input); - break; - case proto::plan::GenericValue::ValCase::kInt64Val: - result = ExecRangeVisitorImplJson(input); - break; - case proto::plan::GenericValue::ValCase::kFloatVal: - result = ExecRangeVisitorImplJson(input); - break; - case proto::plan::GenericValue::ValCase::kStringVal: - result = ExecRangeVisitorImplJson(input); - break; - case proto::plan::GenericValue::ValCase::kArrayVal: - result = - ExecRangeVisitorImplJson(input); - break; - default: - PanicInfo( - DataTypeInvalid, "unknown data type: {}", val_type); + if (CanUseIndexForJson() && !has_offset_input_) { + switch (val_type) { + case proto::plan::GenericValue::ValCase::kBoolVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kInt64Val: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kFloatVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kStringVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kArrayVal: + result = + ExecRangeVisitorImplForIndex(); + break; + default: + PanicInfo( + DataTypeInvalid, "unknown data type: {}", val_type); + } + } else { + switch (val_type) { + case proto::plan::GenericValue::ValCase::kBoolVal: + result = ExecRangeVisitorImplJson(input); + break; + case proto::plan::GenericValue::ValCase::kInt64Val: + result = ExecRangeVisitorImplJson(input); + break; + case proto::plan::GenericValue::ValCase::kFloatVal: + result = ExecRangeVisitorImplJson(input); + break; + case proto::plan::GenericValue::ValCase::kStringVal: + result = ExecRangeVisitorImplJson(input); + break; + case proto::plan::GenericValue::ValCase::kArrayVal: + result = + ExecRangeVisitorImplJson(input); + break; + default: + PanicInfo( + DataTypeInvalid, "unknown data type: {}", val_type); + } } break; } @@ -276,144 +303,145 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray(OffsetVector* input) { if (expr_->column_.nested_path_.size() > 0) { index = std::stoi(expr_->column_.nested_path_[0]); } - auto execute_sub_batch = [op_type]( - const milvus::ArrayView* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ValueType val, - int index) { - switch (op_type) { - case proto::plan::GreaterThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::GreaterEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessThan: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::LessEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Equal: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::NotEqual: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::PrefixMatch: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; - } - case proto::plan::Match: { - UnaryElementFuncForArray - func; - func(data, - valid_data, - size, - val, - index, - res, - valid_res, - offsets); - break; + auto execute_sub_batch = + [op_type]( + const milvus::ArrayView* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ValueType val, + int index) { + switch (op_type) { + case proto::plan::GreaterThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::GreaterEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessThan: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::LessEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Equal: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::NotEqual: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::PrefixMatch: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + case proto::plan::Match: { + UnaryElementFuncForArray + func; + func(data, + valid_data, + size, + val, + index, + res, + valid_res, + offsets); + break; + } + default: + PanicInfo( + OpTypeInvalid, + fmt::format( + "unsupported operator type for unary expr: {}", + op_type)); } - default: - PanicInfo( - OpTypeInvalid, - fmt::format("unsupported operator type for unary expr: {}", - op_type)); - } - }; + }; int64_t processed_size; if (has_offset_input_) { processed_size = @@ -479,7 +507,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { }; } else { auto size_per_chunk = segment_->size_per_chunk(); - retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { + retrieve = [size_per_chunk, this](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = @@ -598,15 +626,15 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson(OffsetVector* input) { res[i] = (cmp); \ } while (false) - auto execute_sub_batch = - [ op_type, pointer ]( - const milvus::Json* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - ExprValueType val) { + auto execute_sub_batch = [op_type, pointer]( + const milvus::Json* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + ExprValueType val) { switch (op_type) { case proto::plan::GreaterThan: { for (size_t i = 0; i < size; ++i) { @@ -978,13 +1006,13 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplForData(OffsetVector* input) { auto execute_sub_batch = [expr_type]( - const T* data, - const bool* valid_data, - const int32_t* offsets, - const int size, - TargetBitmapView res, - TargetBitmapView valid_res, - IndexInnerType val) { + const T* data, + const bool* valid_data, + const int32_t* offsets, + const int size, + TargetBitmapView res, + TargetBitmapView valid_res, + IndexInnerType val) { switch (expr_type) { case proto::plan::GreaterThan: { UnaryElementFunc func; @@ -1085,6 +1113,13 @@ PhyUnaryRangeFilterExpr::CanUseIndex() { return res; } +bool +PhyUnaryRangeFilterExpr::CanUseIndexForJson() { + use_index_ = segment_->HasIndex( + field_id_, milvus::Json::pointer(expr_->column_.nested_path_)); + return use_index_; +} + VectorPtr PhyUnaryRangeFilterExpr::ExecTextMatch() { using Index = index::TextMatchIndex; diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 159fe5abb4091..27547ca651625 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -320,6 +320,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { @@ -378,6 +379,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { bool CanUseIndexForArray(); + bool + CanUseIndexForJson(); + VectorPtr ExecTextMatch(); diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index b3e73024bac3a..825ef5bfee64b 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -15,11 +15,15 @@ // limitations under the License. #include "index/IndexFactory.h" +#include +#include #include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" #include "common/Types.h" #include "index/VectorMemIndex.h" #include "index/Utils.h" #include "index/Meta.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" #include "knowhere/utils.h" #include "index/VectorDiskIndex.h" @@ -29,6 +33,8 @@ #include "index/InvertedIndexTantivy.h" #include "index/HybridScalarIndex.h" #include "knowhere/comp/knowhere_check.h" +#include "log/Log.h" +#include "pb/schema.pb.h" namespace milvus::index { @@ -357,6 +363,49 @@ IndexFactory::CreateComplexScalarIndex( PanicInfo(Unsupported, "Complex index not supported now"); } +IndexBasePtr +IndexFactory::CreateJsonIndex( + IndexType index_type, + DataType cast_dtype, + const std::string& nested_path, + const storage::FileManagerContext& file_manager_context) { + AssertInfo(index_type == INVERTED_INDEX_TYPE, + "Invalid index type for json index"); + switch (cast_dtype) { + case DataType::BOOL: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Bool, + nested_path, + file_manager_context); + case milvus::DataType::INT8: + case milvus::DataType::INT16: + case milvus::DataType::INT32: + case DataType::INT64: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Int64, + nested_path, + file_manager_context); + case DataType::FLOAT: + case DataType::DOUBLE: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Double, + nested_path, + file_manager_context); + case DataType::STRING: + case DataType::VARCHAR: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::String, + nested_path, + file_manager_context); + default: + PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype); + } +} + IndexBasePtr IndexFactory::CreateScalarIndex( const CreateIndexInfo& create_index_info, @@ -379,8 +428,10 @@ IndexFactory::CreateScalarIndex( file_manager_context); } case DataType::JSON: { - return CreateComplexScalarIndex(create_index_info.index_type, - file_manager_context); + return CreateJsonIndex(create_index_info.index_type, + create_index_info.json_cast_type, + create_index_info.json_path, + file_manager_context); } default: PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type); diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index b5a6d408ba1d8..f56e70fdfbd0c 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -21,6 +21,7 @@ #include #include +#include "common/Types.h" #include "common/type_c.h" #include "index/Index.h" #include "index/ScalarIndex.h" @@ -103,6 +104,13 @@ class IndexFactory { const storage::FileManagerContext& file_manager_context = storage::FileManagerContext()); + IndexBasePtr + CreateJsonIndex(IndexType index_type, + DataType cast_dtype, + const std::string& nested_path, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context = diff --git a/internal/core/src/index/IndexInfo.h b/internal/core/src/index/IndexInfo.h index f925de1e4ae99..7768e4c9a5641 100644 --- a/internal/core/src/index/IndexInfo.h +++ b/internal/core/src/index/IndexInfo.h @@ -26,6 +26,8 @@ struct CreateIndexInfo { IndexVersion index_engine_version; std::string field_name; int64_t dim; + DataType json_cast_type; + std::string json_path; }; } // namespace milvus::index diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 0072c3de5d0c8..6bfcd5f4dd95e 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -29,36 +29,6 @@ namespace milvus::index { constexpr const char* TMP_INVERTED_INDEX_PREFIX = "/tmp/milvus/inverted-index/"; -inline TantivyDataType -get_tantivy_data_type(proto::schema::DataType data_type) { - switch (data_type) { - case proto::schema::DataType::Bool: { - return TantivyDataType::Bool; - } - - case proto::schema::DataType::Int8: - case proto::schema::DataType::Int16: - case proto::schema::DataType::Int32: - case proto::schema::DataType::Int64: { - return TantivyDataType::I64; - } - - case proto::schema::DataType::Float: - case proto::schema::DataType::Double: { - return TantivyDataType::F64; - } - - case proto::schema::DataType::String: - case proto::schema::DataType::VarChar: { - return TantivyDataType::Keyword; - } - - default: - PanicInfo(ErrorCode::NotImplemented, - fmt::format("not implemented data type: {}", data_type)); - } -} - inline TantivyDataType get_tantivy_data_type(const proto::schema::FieldSchema& schema) { switch (schema.data_type()) { @@ -300,7 +270,6 @@ template const TargetBitmap InvertedIndexTantivy::Range(T value, OpType op) { TargetBitmap bitset(Count()); - switch (op) { case OpType::LessThan: { auto array = wrapper_->upper_bound_range_query(value, false); @@ -478,6 +447,11 @@ InvertedIndexTantivy::BuildWithFieldData( break; } + case proto::schema::DataType::JSON: { + build_index_for_json(field_datas); + break; + } + default: PanicInfo(ErrorCode::NotImplemented, fmt::format("Inverted index not supported on {}", diff --git a/internal/core/src/index/InvertedIndexTantivy.h b/internal/core/src/index/InvertedIndexTantivy.h index e4bcccb0c5b72..7916ce39bcef7 100644 --- a/internal/core/src/index/InvertedIndexTantivy.h +++ b/internal/core/src/index/InvertedIndexTantivy.h @@ -24,6 +24,36 @@ namespace milvus::index { +inline TantivyDataType +get_tantivy_data_type(proto::schema::DataType data_type) { + switch (data_type) { + case proto::schema::DataType::Bool: { + return TantivyDataType::Bool; + } + + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: { + return TantivyDataType::I64; + } + + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: { + return TantivyDataType::F64; + } + + case proto::schema::DataType::String: + case proto::schema::DataType::VarChar: { + return TantivyDataType::Keyword; + } + + default: + PanicInfo(ErrorCode::NotImplemented, + fmt::format("not implemented data type: {}", data_type)); + } +} + using TantivyIndexWrapper = milvus::tantivy::TantivyIndexWrapper; using RustArrayWrapper = milvus::tantivy::RustArrayWrapper; @@ -175,10 +205,10 @@ class InvertedIndexTantivy : public ScalarIndex { const TargetBitmap RegexQuery(const std::string& regex_pattern) override; - protected: void BuildWithFieldData(const std::vector& datas) override; + protected: void finish(); @@ -186,6 +216,13 @@ class InvertedIndexTantivy : public ScalarIndex { build_index_for_array( const std::vector>& field_datas); + virtual void + build_index_for_json( + const std::vector>& field_datas) { + PanicInfo(ErrorCode::NotImplemented, + "build_index_for_json not implemented"); + }; + protected: std::shared_ptr wrapper_; TantivyDataType d_type_; diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 6aa0b48302410..a043aafacbfbe 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -18,6 +18,7 @@ #include "common/EasyAssert.h" #include "indexbuilder/IndexCreatorBase.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" #include "indexbuilder/ScalarIndexCreator.h" #include "indexbuilder/VecIndexCreator.h" #include "indexbuilder/type_c.h" @@ -60,6 +61,7 @@ class IndexFactory { case DataType::VARCHAR: case DataType::STRING: case DataType::ARRAY: + case DataType::JSON: return CreateScalarIndex(type, config, context); case DataType::VECTOR_FLOAT: @@ -68,6 +70,7 @@ class IndexFactory { case DataType::VECTOR_BINARY: case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique(type, config, context); + default: PanicInfo(DataTypeInvalid, fmt::format("invalid type is {}", invalid_dtype_msg)); diff --git a/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp new file mode 100644 index 0000000000000..ac110bd615b63 --- /dev/null +++ b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp @@ -0,0 +1,67 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include "indexbuilder/JsonInvertedIndexCreator.h" +#include +#include +#include +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" +#include "common/Json.h" +#include "common/Types.h" +#include "log/Log.h" +#include "simdjson/error.h" + +namespace milvus::indexbuilder { + +template +void +JsonInvertedIndexCreator::build_index_for_json( + const std::vector>& field_datas) { + using GetType = + std::conditional_t, std::string_view, T>; + int64_t offset = 0; + LOG_INFO("Start to build json inverted index for field: {}", nested_path_); + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + for (int64_t i = 0; i < n; i++) { + auto json_column = static_cast(data->RawValue(i)); + if (this->schema_.nullable() && !data->is_valid(i)) { + this->null_offset.push_back(i); + continue; + } + value_result res = json_column->at(nested_path_); + auto err = res.error(); + if (err != simdjson::SUCCESS) { + AssertInfo(err == simdjson::INCORRECT_TYPE || + err == simdjson::NO_SUCH_FIELD, + "Failed to parse json, err: {}", + err); + offset++; + continue; + } + if constexpr (std::is_same_v) { + auto value = std::string(res.value()); + this->wrapper_->template add_data(&value, 1, offset++); + } else { + auto value = res.value(); + this->wrapper_->template add_data(&value, 1, offset++); + } + } + } +} + +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; + +} // namespace milvus::indexbuilder \ No newline at end of file diff --git a/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h new file mode 100644 index 0000000000000..6332406354da9 --- /dev/null +++ b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h @@ -0,0 +1,64 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#pragma once +#include "common/FieldDataInterface.h" +#include "index/InvertedIndexTantivy.h" +#include "storage/FileManager.h" +#include "boost/filesystem.hpp" +#include "tantivy-binding.h" + +namespace milvus::indexbuilder { + +template +class JsonInvertedIndexCreator : public index::InvertedIndexTantivy { + public: + JsonInvertedIndexCreator(const proto::schema::DataType cast_type, + const std::string& nested_path, + const storage::FileManagerContext& ctx) + : nested_path_(nested_path) { + this->schema_ = ctx.fieldDataMeta.field_schema; + this->mem_file_manager_ = + std::make_shared(ctx); + this->disk_file_manager_ = + std::make_shared(ctx); + + auto prefix = this->disk_file_manager_->GetTextIndexIdentifier(); + constexpr const char* TMP_INVERTED_INDEX_PREFIX = + "/tmp/milvus/inverted-index/"; + this->path_ = std::string(TMP_INVERTED_INDEX_PREFIX) + prefix; + + this->d_type_ = index::get_tantivy_data_type(cast_type); + boost::filesystem::create_directories(this->path_); + std::string field_name = std::to_string( + this->disk_file_manager_->GetFieldDataMeta().field_id); + this->wrapper_ = std::make_shared( + field_name.c_str(), this->d_type_, this->path_.c_str()); + } + + void + build_index_for_json(const std::vector>& + field_datas) override; + + void + finish() { + this->wrapper_->finish(); + } + + void + create_reader() { + this->wrapper_->create_reader(); + } + + private: + std::string nested_path_; +}; +} // namespace milvus::indexbuilder \ No newline at end of file diff --git a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp index 855be1476017f..4aedeba8bb0dd 100644 --- a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp +++ b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp @@ -10,6 +10,9 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "indexbuilder/ScalarIndexCreator.h" +#include "common/Consts.h" +#include "common/FieldDataInterface.h" +#include "common/Types.h" #include "index/IndexFactory.h" #include "index/IndexInfo.h" #include "index/Meta.h" @@ -32,6 +35,11 @@ ScalarIndexCreator::ScalarIndexCreator( } index_info.field_type = dtype_; index_info.index_type = index_type(); + if (dtype == DataType::JSON) { + index_info.json_cast_type = static_cast( + std::stoi(config.at(JSON_CAST_TYPE).get())); + index_info.json_path = config.at(JSON_PATH).get(); + } index_ = index::IndexFactory::GetInstance().CreateIndex( index_info, file_manager_context); } diff --git a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp index 1f6a1aa1e409e..22803038c6329 100644 --- a/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp +++ b/internal/core/src/segcore/ChunkedSegmentSealedImpl.cpp @@ -167,13 +167,22 @@ ChunkedSegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); - auto row_count = info.index->Count(); - AssertInfo(row_count > 0, "Index count is 0"); - std::unique_lock lck(mutex_); AssertInfo( !get_bit(index_ready_bitset_, field_id), "scalar index has been exist at " + std::to_string(field_id.get())); + + if (field_meta.get_data_type() == DataType::JSON) { + auto path = info.index_params.at(JSON_PATH); + JSONIndexKey key; + key.nested_path = path; + key.field_id = field_id; + json_indexings_[key] = + std::move(const_cast(info).index); + return; + } + auto row_count = info.index->Count(); + AssertInfo(row_count > 0, "Index count is 0"); if (num_rows_.has_value()) { AssertInfo(num_rows_.value() == row_count, "field (" + std::to_string(field_id.get()) + @@ -1823,6 +1832,8 @@ ChunkedSegmentSealedImpl::HasRawData(int64_t field_id) const { field_indexing->indexing_.get()); return vec_index->HasRawData(); } + } else if (IsJsonDataType(field_meta.get_data_type())) { + return get_bit(field_data_ready_bitset_, fieldID); } else { auto scalar_index = scalar_indexings_.find(fieldID); if (scalar_index != scalar_indexings_.end()) { diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 23fc4ff60f184..8ccb24914f145 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -304,6 +304,11 @@ class SegmentGrowingImpl : public SegmentGrowing { return false; } + bool + HasIndex(FieldId field_id, const std::string& nested_path) const override { + return false; + }; + bool HasFieldData(FieldId field_id) const override { return true; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 90e34ce78a7b2..98cb10d6ed86d 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -21,6 +21,7 @@ #include "FieldIndexing.h" #include "common/Common.h" +#include "common/EasyAssert.h" #include "common/Schema.h" #include "common/Span.h" #include "common/SystemProperty.h" @@ -239,6 +240,18 @@ class SegmentInternalInterface : public SegmentInterface { std::to_string(field_id); } + template + const index::ScalarIndex& + chunk_scalar_index(FieldId field_id, + std::string path, + int64_t chunk_id) const { + using IndexType = index::ScalarIndex; + auto base_ptr = chunk_index_impl(field_id, path, chunk_id); + auto ptr = dynamic_cast(base_ptr); + AssertInfo(ptr, "entry mismatch"); + return *ptr; + } + std::unique_ptr Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_group, @@ -268,6 +281,10 @@ class SegmentInternalInterface : public SegmentInterface { virtual bool HasIndex(FieldId field_id) const = 0; + virtual bool + HasIndex(FieldId field_id, const std::string& nested_path) const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + }; virtual bool HasFieldData(FieldId field_id) const = 0; @@ -450,6 +467,13 @@ class SegmentInternalInterface : public SegmentInterface { get_timestamps() const = 0; public: + virtual const index::IndexBase* + chunk_index_impl(FieldId field_id, + std::string path, + int64_t chunk_id) const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + }; + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type virtual void bulk_subscript(SystemFieldType system_type, diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 5078fbc11a5c6..e0097addf78f2 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -59,6 +59,53 @@ class SegmentSealed : public SegmentInternalInterface { type() const override { return SegmentType::Sealed; } + + index::IndexBase* + chunk_index_impl(FieldId field_id, + std::string path, + int64_t chunk_id) const override { + JSONIndexKey key; + key.field_id = field_id; + key.nested_path = path; + AssertInfo(json_indexings_.find(key) != json_indexings_.end(), + "Cannot find json index with path: " + path); + return json_indexings_.at(key).get(); + } + + virtual bool + HasIndex(FieldId field_id) const override = 0; + bool + HasIndex(FieldId field_id, const std::string& path) const override { + JSONIndexKey key; + key.field_id = field_id; + key.nested_path = path; + return json_indexings_.find(key) != json_indexings_.end(); + } + + protected: + struct JSONIndexKey { + FieldId field_id; + std::string nested_path; + bool + operator==(const JSONIndexKey& other) const { + return field_id == other.field_id && + nested_path == other.nested_path; + } + }; + + struct hash_helper { + size_t + operator()(const JSONIndexKey& k) const { + std::hash h1; + std::hash h2; + size_t hash_result = 0; + boost::hash_combine(hash_result, h1(k.field_id.get())); + boost::hash_combine(hash_result, h2(k.nested_path)); + return hash_result; + } + }; + std::unordered_map + json_indexings_; }; using SegmentSealedSPtr = std::shared_ptr; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 3abede7800536..1e50aab9528d5 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -205,13 +205,23 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); - auto row_count = info.index->Count(); - AssertInfo(row_count > 0, "Index count is 0"); - std::unique_lock lck(mutex_); AssertInfo( !get_bit(index_ready_bitset_, field_id), "scalar index has been exist at " + std::to_string(field_id.get())); + + if (field_meta.get_data_type() == DataType::JSON) { + auto path = info.index_params.at(JSON_PATH); + JSONIndexKey key; + key.nested_path = path; + key.field_id = field_id; + json_indexings_[key] = + std::move(const_cast(info).index); + return; + } + auto row_count = info.index->Count(); + AssertInfo(row_count > 0, "Index count is 0"); + if (num_rows_.has_value()) { AssertInfo(num_rows_.value() == row_count, "field (" + std::to_string(field_id.get()) + @@ -220,7 +230,6 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { ") than other column's row count (" + std::to_string(num_rows_.value()) + ")"); } - scalar_indexings_[field_id] = std::move(const_cast(info).index); // reverse pk from scalar index and set pks to offset @@ -685,7 +694,6 @@ SegmentSealedImpl::num_chunk_index(FieldId field_id) const { if (field_meta.is_vector()) { return int64_t(vector_indexings_.is_ready(field_id)); } - return scalar_indexings_.count(field_id); } @@ -1647,6 +1655,8 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const { field_indexing->indexing_.get()); return vec_index->HasRawData(); } + } else if (IsJsonDataType(field_meta.get_data_type())) { + return get_bit(field_data_ready_bitset_, fieldID); } else { auto scalar_index = scalar_indexings_.find(fieldID); if (scalar_index != scalar_indexings_.end()) { diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 16d7bcdd89344..185bc97012aff 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -11,6 +11,7 @@ #include "segcore/load_index_c.h" +#include "common/Consts.h" #include "common/FieldMeta.h" #include "common/EasyAssert.h" #include "common/Types.h" @@ -305,6 +306,11 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { load_index_info->index_params); config[milvus::index::INDEX_FILES] = load_index_info->index_files; + if (load_index_info->field_type == milvus::DataType::JSON) { + index_info.json_cast_type = static_cast( + std::stoi(config.at(JSON_CAST_TYPE).get())); + index_info.json_path = config.at(JSON_PATH).get(); + } milvus::storage::FileManagerContext fileManagerContext( field_meta, index_meta, remote_chunk_manager); fileManagerContext.set_for_loading_index(true); diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 61a2088a2d26f..57fa7c3d1fddf 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -93,14 +93,24 @@ RustResult tantivy_term_query_i64(void *ptr, int64_t term); RustResult tantivy_lower_bound_range_query_i64(void *ptr, int64_t lower_bound, bool inclusive); +RustResult tantivy_lower_bound_range_query_bool(void *ptr, bool lower_bound, bool inclusive); + RustResult tantivy_upper_bound_range_query_i64(void *ptr, int64_t upper_bound, bool inclusive); +RustResult tantivy_upper_bound_range_query_bool(void *ptr, bool upper_bound, bool inclusive); + RustResult tantivy_range_query_i64(void *ptr, int64_t lower_bound, int64_t upper_bound, bool lb_inclusive, bool ub_inclusive); +RustResult tantivy_range_query_bool(void *ptr, + bool lower_bound, + bool upper_bound, + bool lb_inclusive, + bool ub_inclusive); + RustResult tantivy_term_query_f64(void *ptr, double term); RustResult tantivy_lower_bound_range_query_f64(void *ptr, double lower_bound, bool inclusive); diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs index cb5f989070d10..05c6ee37e9a04 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader.rs @@ -119,6 +119,38 @@ impl IndexReaderWrapper { self.search(&q) } + pub fn lower_bound_range_query_bool( + &self, + lower_bound: bool, + inclusive: bool, + ) -> Result> { + let lower_bound = make_bounds(Term::from_field_bool(self.field, lower_bound), inclusive); + let upper_bound = Bound::Unbounded; + let q = RangeQuery::new_term_bounds( + self.field_name.to_string(), + tantivy::schema::Type::Bool, + &lower_bound, + &upper_bound, + ); + self.search(&q) + } + + pub fn upper_bound_range_query_bool( + &self, + upper_bound: bool, + inclusive: bool, + ) -> Result> { + let lower_bound = Bound::Unbounded; + let upper_bound = make_bounds(Term::from_field_bool(self.field, upper_bound), inclusive); + let q = RangeQuery::new_term_bounds( + self.field_name.to_string(), + tantivy::schema::Type::Bool, + &lower_bound, + &upper_bound, + ); + self.search(&q) + } + pub fn range_query_i64( &self, lower_bound: i64, @@ -140,6 +172,24 @@ impl IndexReaderWrapper { self.search(&q) } + pub fn range_query_bool( + &self, + lower_bound: bool, + upper_bound: bool, + lb_inclusive: bool, + ub_inclusive: bool, + ) -> Result> { + let lower_bound = make_bounds(Term::from_field_bool(self.field, lower_bound), lb_inclusive); + let upper_bound = make_bounds(Term::from_field_bool(self.field, upper_bound), ub_inclusive); + let q = RangeQuery::new_term_bounds( + self.field_name.to_string(), + tantivy::schema::Type::Bool, + &lower_bound, + &upper_bound, + ); + self.search(&q) + } + pub fn lower_bound_range_query_f64( &self, lower_bound: f64, diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs index 1adedbd49cf28..71b18f712160d 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_reader_c.rs @@ -60,6 +60,20 @@ pub extern "C" fn tantivy_lower_bound_range_query_i64( } } +#[no_mangle] +pub extern "C" fn tantivy_lower_bound_range_query_bool( + ptr: *mut c_void, + lower_bound: bool, + inclusive: bool, +) -> RustResult { + let real = ptr as *mut IndexReaderWrapper; + unsafe { + (*real) + .lower_bound_range_query_bool(lower_bound, inclusive) + .into() + } +} + #[no_mangle] pub extern "C" fn tantivy_upper_bound_range_query_i64( ptr: *mut c_void, @@ -74,6 +88,20 @@ pub extern "C" fn tantivy_upper_bound_range_query_i64( } } +#[no_mangle] +pub extern "C" fn tantivy_upper_bound_range_query_bool( + ptr: *mut c_void, + upper_bound: bool, + inclusive: bool, +) -> RustResult { + let real = ptr as *mut IndexReaderWrapper; + unsafe { + (*real) + .upper_bound_range_query_bool(upper_bound, inclusive) + .into() + } +} + #[no_mangle] pub extern "C" fn tantivy_range_query_i64( ptr: *mut c_void, @@ -90,6 +118,21 @@ pub extern "C" fn tantivy_range_query_i64( } } +#[no_mangle] +pub extern "C" fn tantivy_range_query_bool( + ptr: *mut c_void, + lower_bound: bool, + upper_bound: bool, + lb_inclusive: bool, + ub_inclusive: bool, +) -> RustResult { + let real = ptr as *mut IndexReaderWrapper; + unsafe { + (*real) + .range_query_bool(lower_bound, upper_bound, lb_inclusive, ub_inclusive) + .into() + } +} #[no_mangle] pub extern "C" fn tantivy_term_query_f64(ptr: *mut c_void, term: f64) -> RustResult { let real = ptr as *mut IndexReaderWrapper; diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index b6e956b47d5a2..c65379e7e8bb5 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "common/EasyAssert.h" #include "tantivy-binding.h" @@ -425,6 +426,11 @@ struct TantivyIndexWrapper { RustArrayWrapper lower_bound_range_query(T lower_bound, bool inclusive) { auto array = [&]() { + if constexpr (std::is_same_v) { + return tantivy_lower_bound_range_query_bool( + reader_, static_cast(lower_bound), inclusive); + } + if constexpr (std::is_integral_v) { return tantivy_lower_bound_range_query_i64( reader_, static_cast(lower_bound), inclusive); @@ -462,6 +468,11 @@ struct TantivyIndexWrapper { RustArrayWrapper upper_bound_range_query(T upper_bound, bool inclusive) { auto array = [&]() { + if constexpr (std::is_same_v) { + return tantivy_upper_bound_range_query_bool( + reader_, static_cast(upper_bound), inclusive); + } + if constexpr (std::is_integral_v) { return tantivy_upper_bound_range_query_i64( reader_, static_cast(upper_bound), inclusive); @@ -502,6 +513,14 @@ struct TantivyIndexWrapper { bool lb_inclusive, bool ub_inclusive) { auto array = [&]() { + if constexpr (std::is_same_v) { + return tantivy_range_query_bool(reader_, + static_cast(lower_bound), + static_cast(upper_bound), + lb_inclusive, + ub_inclusive); + } + if constexpr (std::is_integral_v) { return tantivy_range_query_i64( reader_, diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index e161d2ae08794..10c9c02ea53ea 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -13,16 +13,25 @@ #include #include #include +#include #include #include #include +#include #include #include #include +#include "common/FieldDataInterface.h" #include "common/Json.h" +#include "common/LoadInfo.h" #include "common/Types.h" +#include "index/Meta.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" +#include "knowhere/comp/index_param.h" +#include "mmap/Types.h" #include "pb/plan.pb.h" +#include "pb/schema.pb.h" #include "query/Plan.h" #include "query/PlanNode.h" #include "query/PlanProto.h" @@ -30,6 +39,8 @@ #include "segcore/SegmentGrowingImpl.h" #include "simdjson/padded_string.h" #include "segcore/segment_c.h" +#include "storage/FileManager.h" +#include "storage/Types.h" #include "test_utils/DataGen.h" #include "test_utils/GenExprProto.h" #include "index/IndexFactory.h" @@ -15634,3 +15645,116 @@ TEST_P(ExprTest, TestJsonContainsDiffTypeNullable) { } } } + +template +class JsonIndexTestFixture : public testing::Test { + public: + using DataType = T; + + JsonIndexTestFixture() { + if constexpr (std::is_same_v) { + schema_data_type = proto::schema::Bool; + json_path = "/bool"; + lower_bound.set_bool_val(std::numeric_limits::min()); + upper_bound.set_bool_val(std::numeric_limits::max()); + cast_type = milvus::DataType::BOOL; + } else if constexpr (std::is_same_v) { + schema_data_type = proto::schema::Int64; + json_path = "/int"; + lower_bound.set_int64_val(std::numeric_limits::min()); + upper_bound.set_int64_val(std::numeric_limits::max()); + cast_type = milvus::DataType::INT64; + } else if constexpr (std::is_same_v) { + schema_data_type = proto::schema::Double; + json_path = "/double"; + lower_bound.set_float_val(std::numeric_limits::min()); + upper_bound.set_float_val(std::numeric_limits::max()); + cast_type = milvus::DataType::DOUBLE; + } else if constexpr (std::is_same_v) { + schema_data_type = proto::schema::String; + json_path = "/string"; + lower_bound.set_string_val(""); + std::string s(1024, '9'); + upper_bound.set_string_val(s); + cast_type = milvus::DataType::STRING; + } + } + proto::schema::DataType schema_data_type; + std::string json_path; + proto::plan::GenericValue lower_bound; + proto::plan::GenericValue upper_bound; + milvus::DataType cast_type; +}; + +using JsonIndexTypes = ::testing::Types; +TYPED_TEST_SUITE(JsonIndexTestFixture, JsonIndexTypes); + +TYPED_TEST(JsonIndexTestFixture, TestJsonIndexUnaryExpr) { + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i32_fid = schema->AddDebugField("age32", DataType::INT32); + auto i64_fid = schema->AddDebugField("age64", DataType::INT64); + auto json_fid = schema->AddDebugField("json", DataType::JSON); + schema->set_primary_field_id(i64_fid); + + auto seg = CreateSealedSegment(schema); + int N = 1000; + auto raw_data = DataGen(schema, N); + segcore::LoadIndexInfo load_index_info; + + auto file_manager_ctx = storage::FileManagerContext(); + file_manager_ctx.fieldDataMeta.field_schema.set_data_type( + milvus::proto::schema::JSON); + file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get()); + auto inv_index = index::IndexFactory::GetInstance().CreateJsonIndex( + index::INVERTED_INDEX_TYPE, + this->cast_type, + this->json_path, + file_manager_ctx); + + using json_index_type = + indexbuilder::JsonInvertedIndexCreator; + auto json_index = std::unique_ptr( + static_cast(inv_index.release())); + auto json_col = raw_data.get_col(json_fid); + auto json_field = + std::make_shared>(DataType::JSON, false); + std::vector jsons; + + for (auto& json : json_col) { + jsons.push_back(milvus::Json(simdjson::padded_string(json))); + } + json_field->add_json_data(jsons); + + json_index->BuildWithFieldData({json_field}); + json_index->finish(); + json_index->create_reader(); + + load_index_info.field_id = json_fid.get(); + load_index_info.field_type = DataType::JSON; + load_index_info.index = std::move(json_index); + load_index_info.index_params = {{JSON_PATH, this->json_path}}; + seg->LoadIndex(load_index_info); + + auto json_field_data_info = FieldDataInfo(json_fid.get(), N, {json_field}); + seg->LoadFieldData(json_fid, json_field_data_info); + + auto unary_expr = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}), + proto::plan::OpType::LessEqual, + this->upper_bound); + auto plan = + std::make_shared(DEFAULT_PLANNODE_ID, unary_expr); + auto final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + EXPECT_EQ(final.count(), N); + + unary_expr = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {this->json_path.substr(1)}), + proto::plan::OpType::GreaterEqual, + this->lower_bound); + plan = + std::make_shared(DEFAULT_PLANNODE_ID, unary_expr); + final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + EXPECT_EQ(final.count(), N); +} diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index bc12f4f88da1c..0252bce74fda0 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -231,6 +231,23 @@ func (m *indexMeta) updateIndexTasksMetrics() { } } +func checkJsonParams(index *model.Index, req *indexpb.CreateIndexRequest) bool { + castType1, err := getIndexParam(index.IndexParams, common.JSONCastTypeKey) + if err != nil { + return false + } + castType2, err := getIndexParam(req.GetIndexParams(), common.JSONCastTypeKey) + if err != nil || castType1 != castType2 { + return false + } + jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey) + if err != nil { + return false + } + jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey) + return err == nil && jsonPath1 == jsonPath2 +} + func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool { if len(fieldIndex.TypeParams) != len(req.TypeParams) { return false @@ -317,7 +334,7 @@ func checkParams(fieldIndex *model.Index, req *indexpb.CreateIndexRequest) bool return !notEq } -func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, error) { +func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest, isJson bool) (UniqueID, error) { m.RLock() defer m.RUnlock() @@ -330,7 +347,7 @@ func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, e continue } if req.IndexName == index.IndexName { - if req.FieldID == index.FieldID && checkParams(index, req) { + if req.FieldID == index.FieldID && checkParams(index, req) && (!isJson || checkJsonParams(index, req)) { return index.IndexID, nil } errMsg := "at most one distinct index is allowed per field" @@ -342,6 +359,20 @@ func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, e return 0, fmt.Errorf("CreateIndex failed: %s", errMsg) } if req.FieldID == index.FieldID { + if isJson { + // if it is json index, check if json paths are same + jsonPath1, err := getIndexParam(index.IndexParams, common.JSONPathKey) + if err != nil { + return 0, err + } + jsonPath2, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey) + if err != nil { + return 0, err + } + if jsonPath1 != jsonPath2 { + continue + } + } // creating multiple indexes on same field is not supported errMsg := "CreateIndex failed: creating multiple indexes on same field is not supported" log.Warn(errMsg) @@ -1040,6 +1071,7 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect return t.FieldID, GetIndexType(t.IndexParams) }) vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { return vecindexmgr.GetVecIndexMgrInstance().IsDiskVecIndex(indexType) } diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 8e4ddfccacf76..37d2bc9b00411 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -132,7 +132,7 @@ func TestMeta_ScalarAutoIndex(t *testing.T) { UserIndexParams: userIndexParams, }, } - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, int64(indexID), tmpIndexID) }) @@ -154,12 +154,12 @@ func TestMeta_ScalarAutoIndex(t *testing.T) { }, } req.UserIndexParams = append(req.UserIndexParams, &commonpb.KeyValuePair{Key: "bitmap_cardinality_limit", Value: "1000"}) - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.UserIndexParams = append(req.UserIndexParams, &commonpb.KeyValuePair{Key: "bitmap_cardinality_limit", Value: "500"}) - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) }) @@ -201,7 +201,7 @@ func TestMeta_ScalarAutoIndex(t *testing.T) { UserIndexParams: userIndexParams, }, } - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, int64(indexID), tmpIndexID) newIndexParams := req.GetIndexParams() @@ -266,7 +266,7 @@ func TestMeta_CanCreateIndex(t *testing.T) { } t.Run("can create index", func(t *testing.T) { - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, int64(0), tmpIndexID) index := &model.Index{ @@ -286,37 +286,37 @@ func TestMeta_CanCreateIndex(t *testing.T) { err = m.CreateIndex(context.TODO(), index) assert.NoError(t, err) - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, indexID, tmpIndexID) }) t.Run("params not consistent", func(t *testing.T) { req.TypeParams = append(req.TypeParams, &commonpb.KeyValuePair{Key: "primary_key", Value: "false"}) - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.TypeParams = []*commonpb.KeyValuePair{{Key: common.DimKey, Value: "64"}} - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.TypeParams = typeParams req.UserIndexParams = append(indexParams, &commonpb.KeyValuePair{Key: "metrics_type", Value: "L2"}) - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "HNSW"}} req.UserIndexParams = req.IndexParams - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserIndexParams = req.IndexParams - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) @@ -325,7 +325,7 @@ func TestMeta_CanCreateIndex(t *testing.T) { req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserIndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "AUTOINDEX"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserAutoindexMetricTypeSpecified = false - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, indexID, tmpIndexID) // req should follow the meta @@ -336,14 +336,14 @@ func TestMeta_CanCreateIndex(t *testing.T) { req.IndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "FLAT"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserIndexParams = []*commonpb.KeyValuePair{{Key: common.IndexTypeKey, Value: "AUTOINDEX"}, {Key: common.MetricTypeKey, Value: "COSINE"}} req.UserAutoindexMetricTypeSpecified = true - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) req.IndexParams = indexParams req.UserIndexParams = indexParams req.FieldID++ - tmpIndexID, err = m.CanCreateIndex(req) + tmpIndexID, err = m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) }) @@ -351,14 +351,14 @@ func TestMeta_CanCreateIndex(t *testing.T) { t.Run("multiple indexes", func(t *testing.T) { req.IndexName = "_default_idx_2" req.FieldID = fieldID - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.Error(t, err) assert.Equal(t, int64(0), tmpIndexID) }) t.Run("index has been deleted", func(t *testing.T) { m.indexes[collID][indexID].IsDeleted = true - tmpIndexID, err := m.CanCreateIndex(req) + tmpIndexID, err := m.CanCreateIndex(req, false) assert.NoError(t, err) assert.Equal(t, int64(0), tmpIndexID) }) diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 62a51200c812e..42eaa45939734 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -19,17 +19,21 @@ package datacoord import ( "context" "fmt" + "strconv" + "strings" "time" "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/util/indexparamcheck" "github.com/milvus-io/milvus/internal/util/vecindexmgr" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -164,18 +168,98 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) { } } -func (s *Server) getFieldNameByID(ctx context.Context, collID, fieldID int64) (string, error) { +func (s *Server) getFieldNameByID(schema *schemapb.CollectionSchema, collID, fieldID int64) (string, error) { + for _, field := range schema.GetFields() { + if field.FieldID == fieldID { + return field.Name, nil + } + } + return "", nil +} + +func (s *Server) getSchema(ctx context.Context, collID int64) (*schemapb.CollectionSchema, error) { resp, err := s.broker.DescribeCollectionInternal(ctx, collID) if err != nil { - return "", err + return nil, err } + return resp.GetSchema(), nil +} - for _, field := range resp.GetSchema().GetFields() { - if field.FieldID == fieldID { - return field.Name, nil +func isJsonField(schema *schemapb.CollectionSchema, fieldID int64) (bool, error) { + for _, f := range schema.Fields { + if f.FieldID == fieldID { + return typeutil.IsJSONType(f.DataType), nil } } - return "", nil + return false, merr.WrapErrFieldNotFound(fieldID) +} + +func getIndexParam(indexParams []*commonpb.KeyValuePair, key string) (string, error) { + for _, p := range indexParams { + if p.Key == key { + return p.Value, nil + } + } + return "", merr.WrapErrParameterInvalidMsg("%s not found", key) +} + +func setIndexParam(indexParams []*commonpb.KeyValuePair, key, value string) { + for _, p := range indexParams { + if p.Key == key { + p.Value = value + } + } +} + +func (s *Server) parseAndVerifyNestedPath(identifier string, schema *schemapb.CollectionSchema, fieldID int64) (string, error) { + fieldName := strings.Split(identifier, "[")[0] + nestedPath := make([]string, 0) + helper, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + return "", err + } + field, err := helper.GetFieldFromNameDefaultJSON(fieldName) + if err != nil { + return "", err + } + if field.FieldID != fieldID { + return "", fmt.Errorf("fieldID not match with field name") + } + if field.GetDataType() != schemapb.DataType_JSON && + field.GetDataType() != schemapb.DataType_Array { + errMsg := fmt.Sprintf("%s data type not supported accessed with []", field.GetDataType()) + return "", fmt.Errorf(errMsg) + } + if fieldName != field.Name { + r := strings.ReplaceAll(fieldName, "~", "~0") + r = strings.ReplaceAll(r, "/", "~1") + nestedPath = append(nestedPath, r) + } + jsonKeyStr := identifier[len(fieldName):] + ss := strings.Split(jsonKeyStr, "][") + for i := 0; i < len(ss); i++ { + path := strings.Trim(ss[i], "[]") + if path == "" { + return "", fmt.Errorf("invalid identifier: %s", identifier) + } + if (strings.HasPrefix(path, "\"") && strings.HasSuffix(path, "\"")) || + (strings.HasPrefix(path, "'") && strings.HasSuffix(path, "'")) { + path = path[1 : len(path)-1] + if path == "" { + return "", fmt.Errorf("invalid identifier: %s", identifier) + } + if typeutil.IsArrayType(field.DataType) { + return "", fmt.Errorf("can only access array field with integer index") + } + } else if _, err := strconv.ParseInt(path, 10, 64); err != nil { + return "", fmt.Errorf("json key must be enclosed in double quotes or single quotes: \"%s\"", path) + } + r := strings.ReplaceAll(path, "~", "~0") + r = strings.ReplaceAll(r, "/", "~1") + nestedPath = append(nestedPath, r) + } + + return "/" + strings.Join(nestedPath, "/"), nil } // CreateIndex create an index on collection. @@ -199,21 +283,61 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques } metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc() + schema, err := s.getSchema(ctx, req.GetCollectionID()) + if err != nil { + return merr.Status(err), nil + } + isJson, err := isJsonField(schema, req.GetFieldID()) + if err != nil { + return merr.Status(err), nil + } + + if isJson { + jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey) + if err != nil { + log.Error("get json path from index params failed", zap.Error(err)) + return merr.Status(err), nil + } + nestedPath, err := s.parseAndVerifyNestedPath(jsonPath, schema, req.GetFieldID()) + if err != nil { + log.Error("parse nested path failed", zap.Error(err)) + return merr.Status(err), nil + } + setIndexParam(req.GetIndexParams(), common.JSONPathKey, nestedPath) + } + if req.GetIndexName() == "" { indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) - if len(indexes) == 0 { - fieldName, err := s.getFieldNameByID(ctx, req.GetCollectionID(), req.GetFieldID()) + var defaultIndexName string + if isJson { + jsonPath, err := getIndexParam(req.GetIndexParams(), common.JSONPathKey) + if err != nil { + return merr.Status(err), nil + } + + indexes = lo.Filter(indexes, func(index *model.Index, i int) bool { + path, err := getIndexParam(index.IndexParams, common.JSONPathKey) + return err == nil && path == jsonPath + }) + + defaultIndexName = jsonPath + } else { + fieldName, err := s.getFieldNameByID(schema, req.GetCollectionID(), req.GetFieldID()) if err != nil { log.Warn("get field name from schema failed", zap.Int64("fieldID", req.GetFieldID())) return merr.Status(err), nil } - req.IndexName = fieldName + defaultIndexName = fieldName + } + + if len(indexes) == 0 { + req.IndexName = defaultIndexName } else if len(indexes) == 1 { req.IndexName = indexes[0].IndexName } } - indexID, err := s.meta.indexMeta.CanCreateIndex(req) + indexID, err := s.meta.indexMeta.CanCreateIndex(req, isJson) if err != nil { metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() return merr.Status(err), nil diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 4923f576da968..8ef92dda59e40 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "strconv" "testing" "time" @@ -2511,3 +2512,125 @@ func TestValidateIndexParams(t *testing.T) { assert.Error(t, err) }) } + +func TestJsonIndex(t *testing.T) { + catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.EXPECT().CreateIndex(mock.Anything, mock.Anything).Return(nil).Maybe() + mock0Allocator := newMockAllocator(t) + indexMeta := newSegmentIndexMeta(catalog) + b := mocks.NewMockRootCoordClient(t) + b.EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Code: 0, + }, + Schema: &schemapb.CollectionSchema{ + Name: "test_index", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 0, + Name: "json", + DataType: schemapb.DataType_JSON, + }, + { + FieldID: 1, + Name: "json2", + DataType: schemapb.DataType_JSON, + }, + { + FieldID: 2, + Name: "dynamic", + DataType: schemapb.DataType_JSON, + IsDynamic: true, + }, + }, + }, + }, nil) + + s := &Server{ + meta: &meta{ + catalog: catalog, + collections: map[UniqueID]*collectionInfo{ + collID: { + ID: collID, + }, + }, + indexMeta: indexMeta, + }, + allocator: mock0Allocator, + notifyIndexChan: make(chan UniqueID, 1), + broker: broker.NewCoordinatorBroker(b), + } + s.stateCode.Store(commonpb.StateCode_Healthy) + + req := &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "a", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}}, + } + resp, err := s.CreateIndex(context.Background(), req) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"c\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + // duplicated index with same params + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "a", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_String))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.NoError(t, merr.CheckRPCCall(resp, err)) + + // duplicated index with different cast type + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "a", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.Error(t, merr.CheckRPCCall(resp, err)) + + // duplicated index with different index name + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "b", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"a\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.Error(t, merr.CheckRPCCall(resp, err)) + + // another field json index with same index name + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "a", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "json[\"b\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.Error(t, merr.CheckRPCCall(resp, err)) + + // lack of json params + req = &indexpb.CreateIndexRequest{ + FieldID: 0, + IndexName: "a", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONPathKey, Value: "json[\"a\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.Error(t, merr.CheckRPCCall(resp, err)) + + // incorrect field name in json path + req = &indexpb.CreateIndexRequest{ + FieldID: 1, + IndexName: "c", + IndexParams: []*commonpb.KeyValuePair{{Key: common.JSONCastTypeKey, Value: strconv.Itoa(int(schemapb.DataType_Int16))}, {Key: common.JSONPathKey, Value: "bad_json[\"a\"]"}}, + } + resp, err = s.CreateIndex(context.Background(), req) + assert.Error(t, merr.CheckRPCCall(resp, err)) + +} diff --git a/internal/mocks/util/mock_segcore/mock_data.go b/internal/mocks/util/mock_segcore/mock_data.go index 0d7ed4b137626..22195bc9dc2b1 100644 --- a/internal/mocks/util/mock_segcore/mock_data.go +++ b/internal/mocks/util/mock_segcore/mock_data.go @@ -817,6 +817,7 @@ func GenAndSaveIndexV2(collectionID, partitionID, segmentID, buildID int64, IndexParams: indexInfo.GetIndexParams(), IndexFilePaths: indexPaths, CurrentIndexVersion: indexVersion.CurrentIndexVersion, + IndexID: indexInfo.GetIndexID(), }, nil } diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 970ef6fb4b9a1..0acb123d6d504 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -164,7 +164,7 @@ func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb var result []int64 for _, indexInfo := range indexInfos { fieldID, indexID := indexInfo.FieldID, indexInfo.IndexID - info, ok := segment.IndexInfo[fieldID] + info, ok := segment.IndexInfo[indexID] if !ok { result = append(result, fieldID) continue diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 85519b7770360..0231c63f0b8a5 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -124,7 +124,7 @@ type Segment struct { Node int64 // Node the segment is in Version int64 // Version is the timestamp of loading segment LastDeltaTimestamp uint64 // The timestamp of the last delta record - IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment + IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment, indexID -> FieldIndexInfo } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index 1af3012ed3038..7107a801fbeee 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -411,6 +411,54 @@ func (_c *MockSegment_GetIndex_Call) RunAndReturn(run func(int64) *IndexedFieldI return _c } +// GetIndexByID provides a mock function with given fields: indexID +func (_m *MockSegment) GetIndexByID(indexID int64) *IndexedFieldInfo { + ret := _m.Called(indexID) + + if len(ret) == 0 { + panic("no return value specified for GetIndexByID") + } + + var r0 *IndexedFieldInfo + if rf, ok := ret.Get(0).(func(int64) *IndexedFieldInfo); ok { + r0 = rf(indexID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*IndexedFieldInfo) + } + } + + return r0 +} + +// MockSegment_GetIndexByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIndexByID' +type MockSegment_GetIndexByID_Call struct { + *mock.Call +} + +// GetIndexByID is a helper method to define mock.On call +// - indexID int64 +func (_e *MockSegment_Expecter) GetIndexByID(indexID interface{}) *MockSegment_GetIndexByID_Call { + return &MockSegment_GetIndexByID_Call{Call: _e.mock.On("GetIndexByID", indexID)} +} + +func (_c *MockSegment_GetIndexByID_Call) Run(run func(indexID int64)) *MockSegment_GetIndexByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSegment_GetIndexByID_Call) Return(_a0 *IndexedFieldInfo) *MockSegment_GetIndexByID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_GetIndexByID_Call) RunAndReturn(run func(int64) *IndexedFieldInfo) *MockSegment_GetIndexByID_Call { + _c.Call.Return(run) + return _c +} + // HasRawData provides a mock function with given fields: fieldID func (_m *MockSegment) HasRawData(fieldID int64) bool { ret := _m.Called(fieldID) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 41188c1139bd5..da1a0f1f8e31c 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -278,7 +278,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] - fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] // indexID -> IndexedFieldInfo } func NewSegment(ctx context.Context, @@ -362,13 +362,14 @@ func (s *LocalSegment) initializeSegment() error { indexedFieldInfos, fieldBinlogs := separateIndexAndBinlog(loadInfo) schemaHelper, _ := typeutil.CreateSchemaHelper(s.collection.Schema()) - for fieldID, info := range indexedFieldInfos { + for _, info := range indexedFieldInfos { + fieldID := info.IndexInfo.FieldID field, err := schemaHelper.GetFieldFromID(fieldID) if err != nil { return err } indexInfo := info.IndexInfo - s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{ + s.fieldIndexes.Insert(indexInfo.GetIndexID(), &IndexedFieldInfo{ FieldBinlog: &datapb.FieldBinlog{ FieldID: indexInfo.GetFieldID(), }, @@ -452,17 +453,32 @@ func (s *LocalSegment) LastDeltaTimestamp() uint64 { return s.lastDeltaTimestamp.Load() } +func (s *LocalSegment) GetIndexByID(indexID int64) *IndexedFieldInfo { + info, _ := s.fieldIndexes.Get(indexID) + return info +} + func (s *LocalSegment) GetIndex(fieldID int64) *IndexedFieldInfo { - info, _ := s.fieldIndexes.Get(fieldID) + var info *IndexedFieldInfo + s.fieldIndexes.Range(func(key int64, value *IndexedFieldInfo) bool { + if value.IndexInfo.FieldID == fieldID { + info = value + } + return info == nil + }) return info } func (s *LocalSegment) ExistIndex(fieldID int64) bool { - fieldInfo, ok := s.fieldIndexes.Get(fieldID) - if !ok { - return false - } - return fieldInfo.IndexInfo != nil + contain := false + s.fieldIndexes.Range(func(key int64, value *IndexedFieldInfo) bool { + if value.IndexInfo.FieldID == fieldID { + contain = true + } + return !contain + }) + + return contain } func (s *LocalSegment) HasRawData(fieldID int64) bool { @@ -981,9 +997,9 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn zap.Int64("indexID", indexInfo.GetIndexID()), ) - old := s.GetIndex(indexInfo.GetFieldID()) + old := s.GetIndex(indexInfo.GetIndexID()) // the index loaded - if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded { + if old != nil && old.IsLoaded { log.Warn("index already loaded") return nil } @@ -1115,7 +1131,7 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F return err } - s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{ + s.fieldIndexes.Insert(indexInfo.GetIndexID(), &IndexedFieldInfo{ FieldBinlog: &datapb.FieldBinlog{ FieldID: indexInfo.GetFieldID(), }, diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 400886ccd5edf..16ec822fa67ab 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -71,6 +71,7 @@ type Segment interface { ResourceUsageEstimate() ResourceUsage // Index related + GetIndexByID(indexID int64) *IndexedFieldInfo GetIndex(fieldID int64) *IndexedFieldInfo ExistIndex(fieldID int64) bool Indexes() []*IndexedFieldInfo diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index cab1f64b7645a..66473b2564659 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -109,6 +109,10 @@ func (s *L0Segment) GetIndex(fieldID int64) *IndexedFieldInfo { return nil } +func (s *L0Segment) GetIndexByID(indexID int64) *IndexedFieldInfo { + return nil +} + func (s *L0Segment) ExistIndex(fieldID int64) bool { return false } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e883ab30781a..70d4ada1fbf7a 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -649,11 +649,11 @@ func (loader *segmentLoader) LoadBloomFilterSet(ctx context.Context, collectionI } func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*IndexedFieldInfo, []*datapb.FieldBinlog) { - fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) + fieldID2IndexInfo := make(map[int64][]*querypb.FieldIndexInfo) for _, indexInfo := range loadInfo.IndexInfos { if len(indexInfo.GetIndexFilePaths()) > 0 { fieldID := indexInfo.FieldID - fieldID2IndexInfo[fieldID] = indexInfo + fieldID2IndexInfo[fieldID] = append(fieldID2IndexInfo[fieldID], indexInfo) } } @@ -664,11 +664,13 @@ func separateIndexAndBinlog(loadInfo *querypb.SegmentLoadInfo) (map[int64]*Index fieldID := fieldBinlog.FieldID // check num rows of data meta and index meta are consistent if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok { - fieldInfo := &IndexedFieldInfo{ - FieldBinlog: fieldBinlog, - IndexInfo: indexInfo, + for _, index := range indexInfo { + fieldInfo := &IndexedFieldInfo{ + FieldBinlog: fieldBinlog, + IndexInfo: index, + } + indexedFieldInfos[index.IndexID] = fieldInfo } - indexedFieldInfos[fieldID] = fieldInfo } else { fieldBinlogs = append(fieldBinlogs, fieldBinlog) } @@ -683,11 +685,11 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll map[int64]*datapb.TextIndexStats, // text indexed info map[int64]struct{}, // unindexed text fields ) { - fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) + fieldID2IndexInfo := make(map[int64][]*querypb.FieldIndexInfo) for _, indexInfo := range loadInfo.IndexInfos { if len(indexInfo.GetIndexFilePaths()) > 0 { fieldID := indexInfo.FieldID - fieldID2IndexInfo[fieldID] = indexInfo + fieldID2IndexInfo[fieldID] = append(fieldID2IndexInfo[fieldID], indexInfo) } } @@ -697,12 +699,14 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll for _, fieldBinlog := range loadInfo.BinlogPaths { fieldID := fieldBinlog.FieldID // check num rows of data meta and index meta are consistent - if indexInfo, ok := fieldID2IndexInfo[fieldID]; ok { - fieldInfo := &IndexedFieldInfo{ - FieldBinlog: fieldBinlog, - IndexInfo: indexInfo, + if infos, ok := fieldID2IndexInfo[fieldID]; ok { + for _, indexInfo := range infos { + fieldInfo := &IndexedFieldInfo{ + FieldBinlog: fieldBinlog, + IndexInfo: indexInfo, + } + indexedFieldInfos[indexInfo.IndexID] = fieldInfo } - indexedFieldInfos[fieldID] = fieldInfo } else { fieldBinlogs = append(fieldBinlogs, fieldBinlog) } @@ -759,7 +763,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu log := log.Ctx(ctx).With(zap.Int64("segmentID", segment.ID())) tr := timerecord.NewTimeRecorder("segmentLoader.loadSealedSegment") log.Info("Start loading fields...", - zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), + // zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), zap.Int64s("indexed text fields", lo.Keys(textIndexes)), zap.Int64s("unindexed text fields", lo.Keys(unindexedTextFields)), ) @@ -770,7 +774,8 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(loadFieldsIndexSpan.Milliseconds())) // 2. complement raw data for the scalar fields without raw data - for fieldID, info := range indexedFieldInfos { + for _, info := range indexedFieldInfos { + fieldID := info.IndexInfo.FieldID field, err := schemaHelper.GetFieldFromID(fieldID) if err != nil { return err @@ -1011,7 +1016,8 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, zap.Int64("rowCount", numRows), ) - for fieldID, fieldInfo := range indexedFieldInfos { + for _, fieldInfo := range indexedFieldInfos { + fieldID := fieldInfo.IndexInfo.FieldID indexInfo := fieldInfo.IndexInfo tr := timerecord.NewTimeRecorder("loadFieldIndex") err := loader.loadFieldIndex(ctx, segment, indexInfo) @@ -1272,7 +1278,7 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment, func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *LocalSegment, loadInfo *querypb.SegmentLoadInfo) error { var needReset bool - segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool { + segment.fieldIndexes.Range(func(indexID int64, info *IndexedFieldInfo) bool { for _, info := range info.FieldBinlog.GetBinlogs() { if info.GetEntriesNum() == 0 { needReset = true @@ -1322,7 +1328,7 @@ func (loader *segmentLoader) patchEntryNumber(ctx context.Context, segment *Loca } var err error - segment.fieldIndexes.Range(func(fieldID int64, info *IndexedFieldInfo) bool { + segment.fieldIndexes.Range(func(indexID int64, info *IndexedFieldInfo) bool { if len(info.FieldBinlog.GetBinlogs()) != len(counts) { err = errors.New("rowID & index binlog number not matched") return false diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 446f5fded4fa0..f0b0af04abf37 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1170,7 +1170,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get IsSorted: s.IsSorted(), LastDeltaTimestamp: s.LastDeltaTimestamp(), IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { - return info.IndexInfo.FieldID, info.IndexInfo + return info.IndexInfo.IndexID, info.IndexInfo }), }) } diff --git a/internal/util/indexparamcheck/inverted_checker.go b/internal/util/indexparamcheck/inverted_checker.go index 83a0c65cddbd1..5690562db91fd 100644 --- a/internal/util/indexparamcheck/inverted_checker.go +++ b/internal/util/indexparamcheck/inverted_checker.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -13,13 +15,23 @@ type INVERTEDChecker struct { } func (c *INVERTEDChecker) CheckTrain(dataType schemapb.DataType, params map[string]string) error { + // check json index params + isJSONIndex := typeutil.IsJSONType(dataType) + if isJSONIndex { + if _, exist := params[common.JSONCastTypeKey]; !exist { + return merr.WrapErrParameterMissing(common.JSONCastTypeKey, "json index must specify cast type") + } + if _, exist := params[common.JSONPathKey]; !exist { + return merr.WrapErrParameterMissing(common.JSONPathKey, "json index must specify json path") + } + } return c.scalarIndexChecker.CheckTrain(dataType, params) } func (c *INVERTEDChecker) CheckValidDataType(indexType IndexType, field *schemapb.FieldSchema) error { dType := field.GetDataType() if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) && - !typeutil.IsArrayType(dType) { + !typeutil.IsArrayType(dType) && !typeutil.IsJSONType(dType) { return fmt.Errorf("INVERTED are not supported on %s field", dType.String()) } return nil diff --git a/internal/util/indexparamcheck/inverted_checker_test.go b/internal/util/indexparamcheck/inverted_checker_test.go index 68afa3903b3d9..c56e1a1578ace 100644 --- a/internal/util/indexparamcheck/inverted_checker_test.go +++ b/internal/util/indexparamcheck/inverted_checker_test.go @@ -19,7 +19,7 @@ func Test_INVERTEDIndexChecker(t *testing.T) { assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Int64})) assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Float})) assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_Array})) + assert.NoError(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_JSON})) - assert.Error(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_JSON})) assert.Error(t, c.CheckValidDataType(IndexINVERTED, &schemapb.FieldSchema{DataType: schemapb.DataType_FloatVector})) } diff --git a/pkg/common/common.go b/pkg/common/common.go index be6237a9729b1..36364deb05872 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -140,6 +140,9 @@ const ( IgnoreGrowing = "ignore_growing" ConsistencyLevel = "consistency_level" HintsKey = "hints" + + JSONCastTypeKey = "json_cast_type" + JSONPathKey = "json_path" ) // Doc-in-doc-out diff --git a/tests/go_client/testcases/index_test.go b/tests/go_client/testcases/index_test.go index 98748ef5e57ef..692156c9ffa36 100644 --- a/tests/go_client/testcases/index_test.go +++ b/tests/go_client/testcases/index_test.go @@ -615,7 +615,6 @@ func TestCreateIndexJsonField(t *testing.T) { errMsg string } inxError := []scalarIndexError{ - {index.NewInvertedIndex(), "INVERTED are not supported on JSON field"}, {index.NewSortedIndex(), "STL_SORT are only supported on numeric field"}, {index.NewTrieIndex(), "TRIE are only supported on varchar field"}, } diff --git a/tests/python_client/testcases/test_index.py b/tests/python_client/testcases/test_index.py index 66afe37830e8f..39979fa179b8f 100644 --- a/tests/python_client/testcases/test_index.py +++ b/tests/python_client/testcases/test_index.py @@ -21,6 +21,7 @@ from utils.util_pymilvus import * from common.constants import * from pymilvus.exceptions import MilvusException +from pymilvus import DataType prefix = "index" default_schema = cf.gen_default_collection_schema() @@ -1288,14 +1289,11 @@ def test_create_inverted_index_on_json_field(self, vector_data_type): """ target: test create scalar index on json field method: 1.create collection, and create index - expected: Raise exception + expected: success """ collection_w = self.init_collection_general(prefix, is_index=False, vector_data_type=vector_data_type)[0] - scalar_index_params = {"index_type": "INVERTED"} - collection_w.create_index(ct.default_json_field_name, index_params=scalar_index_params, - check_task=CheckTasks.err_res, - check_items={ct.err_code: 1100, - ct.err_msg: "INVERTED are not supported on JSON field"}) + scalar_index_params = {"index_type": "INVERTED", "json_cast_type": DataType.INT32, "json_path": ct.default_json_field_name+"['a']"} + collection_w.create_index(ct.default_json_field_name, index_params=scalar_index_params) @pytest.mark.tags(CaseLabel.L1) def test_create_inverted_index_on_array_field(self):