From 0c5d8660aa978dc97ded9ff2a7a5f3171353b7c8 Mon Sep 17 00:00:00 2001 From: Jiquan Long Date: Fri, 31 May 2024 09:47:47 +0800 Subject: [PATCH] feat: support inverted index for array (#33452) issue: https://github.com/milvus-io/milvus/issues/27704 --------- Signed-off-by: longjiquan --- .../src/exec/expression/JsonContainsExpr.cpp | 98 ++++- .../src/exec/expression/JsonContainsExpr.h | 7 + internal/core/src/expr/ITypeExpr.h | 17 +- internal/core/src/index/IndexFactory.cpp | 105 +++--- internal/core/src/index/IndexFactory.h | 16 +- .../core/src/index/InvertedIndexTantivy.cpp | 342 ++++++++---------- .../core/src/index/InvertedIndexTantivy.h | 18 +- internal/core/src/index/TantivyConfig.h | 51 --- internal/core/src/indexbuilder/IndexFactory.h | 1 + internal/core/src/indexbuilder/index_c.cpp | 3 +- internal/core/src/pb/CMakeLists.txt | 8 +- internal/core/src/segcore/Types.h | 1 + internal/core/src/segcore/load_index_c.cpp | 51 ++- internal/core/src/segcore/load_index_c.h | 5 + internal/core/src/storage/Types.h | 1 + .../core/thirdparty/tantivy/CMakeLists.txt | 6 + internal/core/thirdparty/tantivy/ffi_demo.cpp | 17 + .../tantivy-binding/include/tantivy-binding.h | 18 + .../tantivy/tantivy-binding/src/demo_c.rs | 14 + .../tantivy-binding/src/index_writer.rs | 76 +++- .../tantivy-binding/src/index_writer_c.rs | 74 ++++ .../tantivy/tantivy-binding/src/lib.rs | 1 + .../core/thirdparty/tantivy/tantivy-wrapper.h | 65 ++++ internal/core/thirdparty/tantivy/test.cpp | 74 ++++ .../core/unittest/test_inverted_index.cpp | 25 +- internal/core/unittest/test_scalar_index.cpp | 28 +- internal/proto/cgo_msg.proto | 23 ++ .../querynodev2/segments/load_index_info.go | 32 ++ internal/querynodev2/segments/segment.go | 50 ++- pkg/util/indexparamcheck/inverted_checker.go | 3 +- .../indexparamcheck/inverted_checker_test.go | 2 +- scripts/generate_proto.sh | 3 + tests/python_client/testcases/test_index.py | 5 +- 33 files changed, 875 insertions(+), 365 deletions(-) delete mode 100644 internal/core/src/index/TantivyConfig.h create mode 100644 internal/core/thirdparty/tantivy/ffi_demo.cpp create mode 100644 internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs create mode 100644 internal/proto/cgo_msg.proto diff --git a/internal/core/src/exec/expression/JsonContainsExpr.cpp b/internal/core/src/exec/expression/JsonContainsExpr.cpp index 72251c301fb14..bbcc852c2a8e2 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.cpp +++ b/internal/core/src/exec/expression/JsonContainsExpr.cpp @@ -23,7 +23,14 @@ namespace exec { void PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { switch (expr_->column_.data_type_) { - case DataType::ARRAY: + case DataType::ARRAY: { + if (is_index_mode_) { + result = EvalArrayContainsForIndexSegment(); + } else { + result = EvalJsonContainsForDataSegment(); + } + break; + } case DataType::JSON: { if (is_index_mode_) { PanicInfo( @@ -94,7 +101,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() { return ExecJsonContainsWithDiffType(); } } - break; } case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: { if (IsArrayDataType(data_type)) { @@ -145,7 +151,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() { return ExecJsonContainsAllWithDiffType(); } } - break; } default: PanicInfo(ExprInvalid, @@ -748,5 +753,92 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType() { return res_vec; } +VectorPtr +PhyJsonContainsFilterExpr::EvalArrayContainsForIndexSegment() { + switch (expr_->column_.element_type_) { + case DataType::BOOL: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::INT8: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::INT16: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::INT32: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::INT64: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::FLOAT: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::DOUBLE: { + return ExecArrayContainsForIndexSegmentImpl(); + } + case DataType::VARCHAR: + case DataType::STRING: { + return ExecArrayContainsForIndexSegmentImpl(); + } + default: + PanicInfo(DataTypeInvalid, + fmt::format("unsupported data type for " + "ExecArrayContainsForIndexSegmentImpl: {}", + expr_->column_.element_type_)); + } +} + +template +VectorPtr +PhyJsonContainsFilterExpr::ExecArrayContainsForIndexSegmentImpl() { + typedef std::conditional_t, + std::string, + ExprValueType> + GetType; + using Index = index::ScalarIndex; + auto real_batch_size = GetNextBatchSize(); + if (real_batch_size == 0) { + return nullptr; + } + + std::unordered_set elements; + for (auto const& element : expr_->vals_) { + elements.insert(GetValueFromProto(element)); + } + boost::container::vector elems(elements.begin(), elements.end()); + auto execute_sub_batch = + [this](Index* index_ptr, + const boost::container::vector& vals) { + switch (expr_->op_) { + case proto::plan::JSONContainsExpr_JSONOp_Contains: + case proto::plan::JSONContainsExpr_JSONOp_ContainsAny: { + return index_ptr->In(vals.size(), vals.data()); + } + case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: { + TargetBitmap result(index_ptr->Count()); + result.set(); + for (size_t i = 0; i < vals.size(); i++) { + auto sub = index_ptr->In(1, &vals[i]); + result &= sub; + } + return result; + } + default: + PanicInfo( + ExprInvalid, + "unsupported array contains type {}", + proto::plan::JSONContainsExpr_JSONOp_Name(expr_->op_)); + } + }; + auto res = ProcessIndexChunks(execute_sub_batch, elems); + AssertInfo(res.size() == real_batch_size, + "internal error: expr processed rows {} not equal " + "expect batch size {}", + res.size(), + real_batch_size); + return std::make_shared(std::move(res)); +} + } //namespace exec } // namespace milvus diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index c757dc0d3fb92..a0cfdfdea0841 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -80,6 +80,13 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { VectorPtr ExecJsonContainsWithDiffType(); + VectorPtr + EvalArrayContainsForIndexSegment(); + + template + VectorPtr + ExecArrayContainsForIndexSegmentImpl(); + private: std::shared_ptr expr_; }; diff --git a/internal/core/src/expr/ITypeExpr.h b/internal/core/src/expr/ITypeExpr.h index 102709aa16b83..6716f8af2f66f 100644 --- a/internal/core/src/expr/ITypeExpr.h +++ b/internal/core/src/expr/ITypeExpr.h @@ -113,11 +113,13 @@ IsMaterializedViewSupported(const DataType& data_type) { struct ColumnInfo { FieldId field_id_; DataType data_type_; + DataType element_type_; std::vector nested_path_; ColumnInfo(const proto::plan::ColumnInfo& column_info) : field_id_(column_info.field_id()), data_type_(static_cast(column_info.data_type())), + element_type_(static_cast(column_info.element_type())), nested_path_(column_info.nested_path().begin(), column_info.nested_path().end()) { } @@ -127,6 +129,7 @@ struct ColumnInfo { std::vector nested_path = {}) : field_id_(field_id), data_type_(data_type), + element_type_(DataType::NONE), nested_path_(std::move(nested_path)) { } @@ -140,6 +143,10 @@ struct ColumnInfo { return false; } + if (element_type_ != other.element_type_) { + return false; + } + for (int i = 0; i < nested_path_.size(); ++i) { if (nested_path_[i] != other.nested_path_[i]) { return false; @@ -151,10 +158,12 @@ struct ColumnInfo { std::string ToString() const { - return fmt::format("[FieldId:{}, data_type:{}, nested_path:{}]", - std::to_string(field_id_.get()), - data_type_, - milvus::Join(nested_path_, ",")); + return fmt::format( + "[FieldId:{}, data_type:{}, element_type:{}, nested_path:{}]", + std::to_string(field_id_.get()), + data_type_, + element_type_, + milvus::Join(nested_path_, ",")); } }; diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index 79409056d980e..cc660324f1b5a 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -35,13 +35,9 @@ template ScalarIndexPtr IndexFactory::CreateScalarIndex( const IndexType& index_type, - const storage::FileManagerContext& file_manager_context, - DataType d_type) { + const storage::FileManagerContext& file_manager_context) { if (index_type == INVERTED_INDEX_TYPE) { - TantivyConfig cfg; - cfg.data_type_ = d_type; - return std::make_unique>(cfg, - file_manager_context); + return std::make_unique>(file_manager_context); } if (index_type == BITMAP_INDEX_TYPE) { return std::make_unique>(file_manager_context); @@ -60,14 +56,11 @@ template <> ScalarIndexPtr IndexFactory::CreateScalarIndex( const IndexType& index_type, - const storage::FileManagerContext& file_manager_context, - DataType d_type) { + const storage::FileManagerContext& file_manager_context) { #if defined(__linux__) || defined(__APPLE__) if (index_type == INVERTED_INDEX_TYPE) { - TantivyConfig cfg; - cfg.data_type_ = d_type; return std::make_unique>( - cfg, file_manager_context); + file_manager_context); } if (index_type == BITMAP_INDEX_TYPE) { return std::make_unique>( @@ -84,13 +77,10 @@ ScalarIndexPtr IndexFactory::CreateScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context, - std::shared_ptr space, - DataType d_type) { + std::shared_ptr space) { if (index_type == INVERTED_INDEX_TYPE) { - TantivyConfig cfg; - cfg.data_type_ = d_type; - return std::make_unique>( - cfg, file_manager_context, space); + return std::make_unique>(file_manager_context, + space); } if (index_type == BITMAP_INDEX_TYPE) { return std::make_unique>(file_manager_context, @@ -104,14 +94,11 @@ ScalarIndexPtr IndexFactory::CreateScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context, - std::shared_ptr space, - DataType d_type) { + std::shared_ptr space) { #if defined(__linux__) || defined(__APPLE__) if (index_type == INVERTED_INDEX_TYPE) { - TantivyConfig cfg; - cfg.data_type_ = d_type; return std::make_unique>( - cfg, file_manager_context, space); + file_manager_context, space); } if (index_type == BITMAP_INDEX_TYPE) { return std::make_unique>( @@ -148,41 +135,32 @@ IndexFactory::CreateIndex( } IndexBasePtr -IndexFactory::CreateScalarIndex( - const CreateIndexInfo& create_index_info, +IndexFactory::CreatePrimitiveScalarIndex( + DataType data_type, + IndexType index_type, const storage::FileManagerContext& file_manager_context) { - auto data_type = create_index_info.field_type; - auto index_type = create_index_info.index_type; - switch (data_type) { // create scalar index case DataType::BOOL: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::INT8: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::INT16: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::INT32: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::INT64: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::FLOAT: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); case DataType::DOUBLE: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, file_manager_context); // create string index case DataType::STRING: case DataType::VARCHAR: - return CreateScalarIndex( - index_type, file_manager_context, data_type); + return CreateScalarIndex(index_type, + file_manager_context); default: throw SegcoreError( DataTypeInvalid, @@ -190,6 +168,24 @@ IndexFactory::CreateScalarIndex( } } +IndexBasePtr +IndexFactory::CreateScalarIndex( + const CreateIndexInfo& create_index_info, + const storage::FileManagerContext& file_manager_context) { + switch (create_index_info.field_type) { + case DataType::ARRAY: + return CreatePrimitiveScalarIndex( + static_cast( + file_manager_context.fieldDataMeta.schema.element_type()), + create_index_info.index_type, + file_manager_context); + default: + return CreatePrimitiveScalarIndex(create_index_info.field_type, + create_index_info.index_type, + file_manager_context); + } +} + IndexBasePtr IndexFactory::CreateVectorIndex( const CreateIndexInfo& create_index_info, @@ -257,32 +253,25 @@ IndexFactory::CreateScalarIndex(const CreateIndexInfo& create_index_info, switch (data_type) { // create scalar index case DataType::BOOL: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::INT8: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::INT16: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::INT32: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::INT64: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::FLOAT: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); case DataType::DOUBLE: - return CreateScalarIndex( - index_type, file_manager, space, data_type); + return CreateScalarIndex(index_type, file_manager, space); // create string index case DataType::STRING: case DataType::VARCHAR: return CreateScalarIndex( - index_type, file_manager, space, data_type); + index_type, file_manager, space); default: throw SegcoreError( DataTypeInvalid, diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index 75bd090292907..47b255ab4e912 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -65,6 +65,13 @@ class IndexFactory { CreateVectorIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context); + IndexBasePtr + CreatePrimitiveScalarIndex( + DataType data_type, + IndexType index_type, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context = @@ -89,15 +96,13 @@ class IndexFactory { ScalarIndexPtr CreateScalarIndex(const IndexType& index_type, const storage::FileManagerContext& file_manager = - storage::FileManagerContext(), - DataType d_type = DataType::NONE); + storage::FileManagerContext()); template ScalarIndexPtr CreateScalarIndex(const IndexType& index_type, const storage::FileManagerContext& file_manager, - std::shared_ptr space, - DataType d_type = DataType::NONE); + std::shared_ptr space); }; // template <> @@ -112,6 +117,5 @@ ScalarIndexPtr IndexFactory::CreateScalarIndex( const IndexType& index_type, const storage::FileManagerContext& file_manager_context, - std::shared_ptr space, - DataType d_type); + std::shared_ptr space); } // namespace milvus::index diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 2c212704aaf49..f09297dd33269 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -23,12 +23,50 @@ #include "InvertedIndexTantivy.h" namespace milvus::index { +inline TantivyDataType +get_tantivy_data_type(proto::schema::DataType data_type) { + switch (data_type) { + case proto::schema::DataType::Bool: { + return TantivyDataType::Bool; + } + + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: { + return TantivyDataType::I64; + } + + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: { + return TantivyDataType::F64; + } + + case proto::schema::DataType::VarChar: { + return TantivyDataType::Keyword; + } + + default: + PanicInfo(ErrorCode::NotImplemented, + fmt::format("not implemented data type: {}", data_type)); + } +} + +inline TantivyDataType +get_tantivy_data_type(const proto::schema::FieldSchema& schema) { + switch (schema.data_type()) { + case proto::schema::Array: + return get_tantivy_data_type(schema.element_type()); + default: + return get_tantivy_data_type(schema.data_type()); + } +} + template InvertedIndexTantivy::InvertedIndexTantivy( - const TantivyConfig& cfg, const storage::FileManagerContext& ctx, std::shared_ptr space) - : cfg_(cfg), space_(space) { + : space_(space), schema_(ctx.fieldDataMeta.schema) { mem_file_manager_ = std::make_shared(ctx, ctx.space_); disk_file_manager_ = std::make_shared(ctx, ctx.space_); auto field = @@ -36,7 +74,7 @@ InvertedIndexTantivy::InvertedIndexTantivy( auto prefix = disk_file_manager_->GetLocalIndexObjectPrefix(); path_ = prefix; boost::filesystem::create_directories(path_); - d_type_ = cfg_.to_tantivy_data_type(); + d_type_ = get_tantivy_data_type(schema_); if (tantivy_index_exist(path_.c_str())) { LOG_INFO( "index {} already exists, which should happen in loading progress", @@ -114,83 +152,7 @@ InvertedIndexTantivy::Build(const Config& config) { AssertInfo(insert_files.has_value(), "insert_files were empty"); auto field_datas = mem_file_manager_->CacheRawDataToMemory(insert_files.value()); - switch (cfg_.data_type_) { - case DataType::BOOL: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data(static_cast(data->Data()), - n); - } - break; - } - - case DataType::INT8: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT16: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT32: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT64: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::FLOAT: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::DOUBLE: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::VARCHAR: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - default: - PanicInfo(ErrorCode::NotImplemented, - fmt::format("todo: not supported, {}", cfg_.data_type_)); - } + build_index(field_datas); } template @@ -211,84 +173,7 @@ InvertedIndexTantivy::BuildV2(const Config& config) { field_data->FillFieldData(col_data); field_datas.push_back(field_data); } - - switch (cfg_.data_type_) { - case DataType::BOOL: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data(static_cast(data->Data()), - n); - } - break; - } - - case DataType::INT8: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT16: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT32: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::INT64: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::FLOAT: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::DOUBLE: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - case DataType::VARCHAR: { - for (const auto& data : field_datas) { - auto n = data->get_num_rows(); - wrapper_->add_data( - static_cast(data->Data()), n); - } - break; - } - - default: - PanicInfo(ErrorCode::NotImplemented, - fmt::format("todo: not supported, {}", cfg_.data_type_)); - } + build_index(field_datas); } template @@ -333,7 +218,8 @@ InvertedIndexTantivy::In(size_t n, const T* values) { template const TargetBitmap InvertedIndexTantivy::NotIn(size_t n, const T* values) { - TargetBitmap bitset(Count(), true); + TargetBitmap bitset(Count()); + bitset.set(); for (size_t i = 0; i < n; ++i) { auto array = wrapper_->term_query(values[i]); apply_hits(bitset, array, false); @@ -425,51 +311,107 @@ void InvertedIndexTantivy::BuildWithRawData(size_t n, const void* values, const Config& config) { - if constexpr (!std::is_same_v) { - TantivyConfig cfg; - if constexpr (std::is_same_v) { - cfg.data_type_ = DataType::INT8; - } - if constexpr (std::is_same_v) { - cfg.data_type_ = DataType::INT16; + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Int8); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Int16); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Int32); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Int64); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Float); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::Double); + } + if constexpr (std::is_same_v) { + schema_.set_data_type(proto::schema::DataType::VarChar); + } + boost::uuids::random_generator generator; + auto uuid = generator(); + auto prefix = boost::uuids::to_string(uuid); + path_ = fmt::format("/tmp/{}", prefix); + boost::filesystem::create_directories(path_); + d_type_ = get_tantivy_data_type(schema_); + std::string field = "test_inverted_index"; + wrapper_ = std::make_shared( + field.c_str(), d_type_, path_.c_str()); + wrapper_->add_data(static_cast(values), n); + finish(); +} + +template +void +InvertedIndexTantivy::build_index( + const std::vector>& field_datas) { + switch (schema_.data_type()) { + case proto::schema::DataType::Bool: + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: + case proto::schema::DataType::String: + case proto::schema::DataType::VarChar: { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + wrapper_->add_data(static_cast(data->Data()), n); + } + break; } - if constexpr (std::is_same_v) { - cfg.data_type_ = DataType::INT32; + + case proto::schema::DataType::Array: { + build_index_for_array(field_datas); + break; } - if constexpr (std::is_same_v) { - cfg.data_type_ = DataType::INT64; + + default: + PanicInfo(ErrorCode::NotImplemented, + fmt::format("Inverted index not supported on {}", + schema_.data_type())); + } +} + +template +void +InvertedIndexTantivy::build_index_for_array( + const std::vector>& field_datas) { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + auto array_column = static_cast(data->Data()); + for (int64_t i = 0; i < n; i++) { + assert(array_column[i].get_element_type() == + static_cast(schema_.element_type())); + wrapper_->template add_multi_data( + reinterpret_cast(array_column[i].data()), + array_column[i].length()); } - if constexpr (std::is_same_v) { - cfg.data_type_ = DataType::VARCHAR; + } +} + +template <> +void +InvertedIndexTantivy::build_index_for_array( + const std::vector>& field_datas) { + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + auto array_column = static_cast(data->Data()); + for (int64_t i = 0; i < n; i++) { + assert(array_column[i].get_element_type() == + static_cast(schema_.element_type())); + std::vector output; + for (int64_t j = 0; j < array_column[i].length(); j++) { + output.push_back( + array_column[i].template get_data(j)); + } + wrapper_->template add_multi_data(output.data(), output.size()); } - boost::uuids::random_generator generator; - auto uuid = generator(); - auto prefix = boost::uuids::to_string(uuid); - path_ = fmt::format("/tmp/{}", prefix); - boost::filesystem::create_directories(path_); - cfg_ = cfg; - d_type_ = cfg_.to_tantivy_data_type(); - std::string field = "test_inverted_index"; - wrapper_ = std::make_shared( - field.c_str(), d_type_, path_.c_str()); - wrapper_->add_data(static_cast(values), n); - finish(); - } else { - boost::uuids::random_generator generator; - auto uuid = generator(); - auto prefix = boost::uuids::to_string(uuid); - path_ = fmt::format("/tmp/{}", prefix); - boost::filesystem::create_directories(path_); - cfg_ = TantivyConfig{ - .data_type_ = DataType::VARCHAR, - }; - d_type_ = cfg_.to_tantivy_data_type(); - std::string field = "test_inverted_index"; - wrapper_ = std::make_shared( - field.c_str(), d_type_, path_.c_str()); - wrapper_->add_data(static_cast(values), - n); - finish(); } } diff --git a/internal/core/src/index/InvertedIndexTantivy.h b/internal/core/src/index/InvertedIndexTantivy.h index 0ea2f64d869d3..cc0178804c343 100644 --- a/internal/core/src/index/InvertedIndexTantivy.h +++ b/internal/core/src/index/InvertedIndexTantivy.h @@ -18,7 +18,6 @@ #include "tantivy-binding.h" #include "tantivy-wrapper.h" #include "index/StringIndex.h" -#include "index/TantivyConfig.h" #include "storage/space.h" namespace milvus::index { @@ -36,13 +35,11 @@ class InvertedIndexTantivy : public ScalarIndex { InvertedIndexTantivy() = default; - explicit InvertedIndexTantivy(const TantivyConfig& cfg, - const storage::FileManagerContext& ctx) - : InvertedIndexTantivy(cfg, ctx, nullptr) { + explicit InvertedIndexTantivy(const storage::FileManagerContext& ctx) + : InvertedIndexTantivy(ctx, nullptr) { } - explicit InvertedIndexTantivy(const TantivyConfig& cfg, - const storage::FileManagerContext& ctx, + explicit InvertedIndexTantivy(const storage::FileManagerContext& ctx, std::shared_ptr space); ~InvertedIndexTantivy(); @@ -160,11 +157,18 @@ class InvertedIndexTantivy : public ScalarIndex { void finish(); + void + build_index(const std::vector>& field_datas); + + void + build_index_for_array( + const std::vector>& field_datas); + private: std::shared_ptr wrapper_; - TantivyConfig cfg_; TantivyDataType d_type_; std::string path_; + proto::schema::FieldSchema schema_; /* * To avoid IO amplification, we use both mem file manager & disk file manager diff --git a/internal/core/src/index/TantivyConfig.h b/internal/core/src/index/TantivyConfig.h deleted file mode 100644 index 355b4c76efc9d..0000000000000 --- a/internal/core/src/index/TantivyConfig.h +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License - -#pragma once - -#include "storage/Types.h" -#include "tantivy-binding.h" - -namespace milvus::index { -struct TantivyConfig { - DataType data_type_; - - TantivyDataType - to_tantivy_data_type() { - switch (data_type_) { - case DataType::BOOL: { - return TantivyDataType::Bool; - } - - case DataType::INT8: - case DataType::INT16: - case DataType::INT32: - case DataType::INT64: { - return TantivyDataType::I64; - } - - case DataType::FLOAT: - case DataType::DOUBLE: { - return TantivyDataType::F64; - } - - case DataType::VARCHAR: { - return TantivyDataType::Keyword; - } - - default: - PanicInfo( - ErrorCode::NotImplemented, - fmt::format("not implemented data type: {}", data_type_)); - } - } -}; -} // namespace milvus::index \ No newline at end of file diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index cd361499b4065..1380a6e9817d3 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -60,6 +60,7 @@ class IndexFactory { case DataType::DOUBLE: case DataType::VARCHAR: case DataType::STRING: + case DataType::ARRAY: return CreateScalarIndex(type, config, context); case DataType::VECTOR_FLOAT: diff --git a/internal/core/src/indexbuilder/index_c.cpp b/internal/core/src/indexbuilder/index_c.cpp index ae319cc26d61f..7ccaf7c414a24 100644 --- a/internal/core/src/indexbuilder/index_c.cpp +++ b/internal/core/src/indexbuilder/index_c.cpp @@ -190,7 +190,8 @@ CreateIndex(CIndex* res_index, build_index_info->collectionid(), build_index_info->partitionid(), build_index_info->segmentid(), - build_index_info->field_schema().fieldid()}; + build_index_info->field_schema().fieldid(), + build_index_info->field_schema()}; milvus::storage::IndexMeta index_meta{ build_index_info->segmentid(), diff --git a/internal/core/src/pb/CMakeLists.txt b/internal/core/src/pb/CMakeLists.txt index 3c00203cf4c25..35726d9c24c65 100644 --- a/internal/core/src/pb/CMakeLists.txt +++ b/internal/core/src/pb/CMakeLists.txt @@ -11,12 +11,10 @@ find_package(Protobuf REQUIRED) +file(GLOB_RECURSE milvus_proto_srcs + "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") add_library(milvus_proto STATIC - common.pb.cc - index_cgo_msg.pb.cc - plan.pb.cc - schema.pb.cc - segcore.pb.cc + ${milvus_proto_srcs} ) message(STATUS "milvus proto sources: " ${milvus_proto_srcs}) diff --git a/internal/core/src/segcore/Types.h b/internal/core/src/segcore/Types.h index 73ba7fcb188b6..106799ce2610f 100644 --- a/internal/core/src/segcore/Types.h +++ b/internal/core/src/segcore/Types.h @@ -46,6 +46,7 @@ struct LoadIndexInfo { std::string uri; int64_t index_store_version; IndexVersion index_engine_version; + proto::schema::FieldSchema schema; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index 7f851948545d3..3df3a92879751 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -25,6 +25,7 @@ #include "storage/Util.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" +#include "pb/cgo_msg.pb.h" bool IsLoadWithDisk(const char* index_type, int index_engine_version) { @@ -258,7 +259,8 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { load_index_info->collection_id, load_index_info->partition_id, load_index_info->segment_id, - load_index_info->field_id}; + load_index_info->field_id, + load_index_info->schema}; milvus::storage::IndexMeta index_meta{load_index_info->segment_id, load_index_info->field_id, load_index_info->index_build_id, @@ -484,3 +486,50 @@ AppendStorageInfo(CLoadIndexInfo c_load_index_info, load_index_info->uri = uri; load_index_info->index_store_version = version; } + +CStatus +FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, + const uint8_t* serialized_load_index_info, + const uint64_t len) { + try { + auto info_proto = std::make_unique(); + info_proto->ParseFromArray(serialized_load_index_info, len); + auto load_index_info = + static_cast(c_load_index_info); + // TODO: keep this since LoadIndexInfo is used by SegmentSealed. + { + load_index_info->collection_id = info_proto->collectionid(); + load_index_info->partition_id = info_proto->partitionid(); + load_index_info->segment_id = info_proto->segmentid(); + load_index_info->field_id = info_proto->field().fieldid(); + load_index_info->field_type = + static_cast(info_proto->field().data_type()); + load_index_info->enable_mmap = info_proto->enable_mmap(); + load_index_info->mmap_dir_path = info_proto->mmap_dir_path(); + load_index_info->index_id = info_proto->indexid(); + load_index_info->index_build_id = info_proto->index_buildid(); + load_index_info->index_version = info_proto->index_version(); + for (const auto& [k, v] : info_proto->index_params()) { + load_index_info->index_params[k] = v; + } + load_index_info->index_files.assign( + info_proto->index_files().begin(), + info_proto->index_files().end()); + load_index_info->uri = info_proto->uri(); + load_index_info->index_store_version = + info_proto->index_store_version(); + load_index_info->index_engine_version = + info_proto->index_engine_version(); + load_index_info->schema = info_proto->field(); + } + auto status = CStatus(); + status.error_code = milvus::Success; + status.error_msg = ""; + return status; + } catch (std::exception& e) { + auto status = CStatus(); + status.error_code = milvus::UnexpectedError; + status.error_msg = strdup(e.what()); + return status; + } +} diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index 7a3d89b797670..8755aa7396162 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -76,6 +76,11 @@ void AppendStorageInfo(CLoadIndexInfo c_load_index_info, const char* uri, int64_t version); + +CStatus +FinishLoadIndexInfo(CLoadIndexInfo c_load_index_info, + const uint8_t* serialized_load_index_info, + const uint64_t len); #ifdef __cplusplus } #endif diff --git a/internal/core/src/storage/Types.h b/internal/core/src/storage/Types.h index 924873dccda64..fbd72d0a59a78 100644 --- a/internal/core/src/storage/Types.h +++ b/internal/core/src/storage/Types.h @@ -64,6 +64,7 @@ struct FieldDataMeta { int64_t partition_id; int64_t segment_id; int64_t field_id; + proto::schema::FieldSchema schema; }; enum CodecType { diff --git a/internal/core/thirdparty/tantivy/CMakeLists.txt b/internal/core/thirdparty/tantivy/CMakeLists.txt index f4d928922874f..c1435a032a85e 100644 --- a/internal/core/thirdparty/tantivy/CMakeLists.txt +++ b/internal/core/thirdparty/tantivy/CMakeLists.txt @@ -71,3 +71,9 @@ target_link_libraries(bench_tantivy boost_filesystem dl ) + +add_executable(ffi_demo ffi_demo.cpp) +target_link_libraries(ffi_demo + tantivy_binding + dl + ) diff --git a/internal/core/thirdparty/tantivy/ffi_demo.cpp b/internal/core/thirdparty/tantivy/ffi_demo.cpp new file mode 100644 index 0000000000000..1626d655f175d --- /dev/null +++ b/internal/core/thirdparty/tantivy/ffi_demo.cpp @@ -0,0 +1,17 @@ +#include +#include + +#include "tantivy-binding.h" + +int +main(int argc, char* argv[]) { + std::vector data{"data1", "data2", "data3"}; + std::vector datas{}; + for (auto& s : data) { + datas.push_back(s.c_str()); + } + + print_vector_of_strings(datas.data(), datas.size()); + + return 0; +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h index 3b22018bf047e..045d4a50e6a2c 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h +++ b/internal/core/thirdparty/tantivy/tantivy-binding/include/tantivy-binding.h @@ -97,6 +97,24 @@ void tantivy_index_add_bools(void *ptr, const bool *array, uintptr_t len); void tantivy_index_add_keyword(void *ptr, const char *s); +void tantivy_index_add_multi_int8s(void *ptr, const int8_t *array, uintptr_t len); + +void tantivy_index_add_multi_int16s(void *ptr, const int16_t *array, uintptr_t len); + +void tantivy_index_add_multi_int32s(void *ptr, const int32_t *array, uintptr_t len); + +void tantivy_index_add_multi_int64s(void *ptr, const int64_t *array, uintptr_t len); + +void tantivy_index_add_multi_f32s(void *ptr, const float *array, uintptr_t len); + +void tantivy_index_add_multi_f64s(void *ptr, const double *array, uintptr_t len); + +void tantivy_index_add_multi_bools(void *ptr, const bool *array, uintptr_t len); + +void tantivy_index_add_multi_keywords(void *ptr, const char *const *array, uintptr_t len); + bool tantivy_index_exist(const char *path); +void print_vector_of_strings(const char *const *ptr, uintptr_t len); + } // extern "C" diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs new file mode 100644 index 0000000000000..257a41f17a891 --- /dev/null +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/demo_c.rs @@ -0,0 +1,14 @@ +use std::{ffi::{c_char, CStr}, slice}; + +#[no_mangle] +pub extern "C" fn print_vector_of_strings(ptr: *const *const c_char, len: usize) { + let arr : &[*const c_char] = unsafe { + slice::from_raw_parts(ptr, len) + }; + for element in arr { + let c_str = unsafe { + CStr::from_ptr(*element) + }; + println!("{}", c_str.to_str().unwrap()); + } +} \ No newline at end of file diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs index ce96a5b4d5a30..2c8d56bf38694 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer.rs @@ -1,10 +1,11 @@ -use futures::executor::block_on; +use std::ffi::CStr; +use libc::c_char; use tantivy::schema::{Field, IndexRecordOption, Schema, TextFieldIndexing, TextOptions, INDEXED}; -use tantivy::{doc, tokenizer, Index, IndexWriter, SingleSegmentIndexWriter}; +use tantivy::{doc, tokenizer, Index, SingleSegmentIndexWriter, Document}; use crate::data_type::TantivyDataType; -use crate::index_writer; + use crate::log::init_log; pub struct IndexWriterWrapper { @@ -98,7 +99,74 @@ impl IndexWriterWrapper { .unwrap(); } - pub fn finish(mut self) { + pub fn add_multi_i8s(&mut self, datas: &[i8]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data as i64); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_i16s(&mut self, datas: &[i16]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data as i64); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_i32s(&mut self, datas: &[i32]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data as i64); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_i64s(&mut self, datas: &[i64]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_f32s(&mut self, datas: &[f32]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data as f64); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_f64s(&mut self, datas: &[f64]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_bools(&mut self, datas: &[bool]) { + let mut document = Document::default(); + for data in datas { + document.add_field_value(self.field, *data); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn add_multi_keywords(&mut self, datas: &[*const c_char]) { + let mut document = Document::default(); + for element in datas { + let data = unsafe { + CStr::from_ptr(*element) + }; + document.add_field_value(self.field, data.to_str().unwrap()); + } + self.index_writer.add_document(document).unwrap(); + } + + pub fn finish(self) { self.index_writer .finalize() .expect("failed to build inverted index"); diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs index c8822781158e8..b13f550d7cb00 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/index_writer_c.rs @@ -122,3 +122,77 @@ pub extern "C" fn tantivy_index_add_keyword(ptr: *mut c_void, s: *const c_char) let c_str = unsafe { CStr::from_ptr(s) }; unsafe { (*real).add_keyword(c_str.to_str().unwrap()) } } + +// --------------------------------------------- array ------------------------------------------ + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int8s(ptr: *mut c_void, array: *const i8, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_i8s(arr) + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int16s(ptr: *mut c_void, array: *const i16, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_i16s(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int32s(ptr: *mut c_void, array: *const i32, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_i32s(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_int64s(ptr: *mut c_void, array: *const i64, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_i64s(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_f32s(ptr: *mut c_void, array: *const f32, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_f32s(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_f64s(ptr: *mut c_void, array: *const f64, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_f64s(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_bools(ptr: *mut c_void, array: *const bool, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len) ; + (*real).add_multi_bools(arr); + } +} + +#[no_mangle] +pub extern "C" fn tantivy_index_add_multi_keywords(ptr: *mut c_void, array: *const *const c_char, len: usize) { + let real = ptr as *mut IndexWriterWrapper; + unsafe { + let arr = slice::from_raw_parts(array, len); + (*real).add_multi_keywords(arr) + } +} diff --git a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs index aa069cb3b32b6..c6193de3f6908 100644 --- a/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs +++ b/internal/core/thirdparty/tantivy/tantivy-binding/src/lib.rs @@ -10,6 +10,7 @@ mod log; mod util; mod util_c; mod vec_collector; +mod demo_c; pub fn add(left: usize, right: usize) -> usize { left + right diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index 358f14ea49ed0..3076f502aee21 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -1,5 +1,7 @@ #include #include +#include +#include #include "tantivy-binding.h" namespace milvus::tantivy { @@ -49,6 +51,15 @@ struct RustArrayWrapper { std::cout << ss.str() << std::endl; } + std::set + to_set() { + std::set s; + for (int i = 0; i < array_.len; i++) { + s.insert(array_.array[i]); + } + return s; + } + RustArray array_; private: @@ -186,6 +197,60 @@ struct TantivyIndexWrapper { typeid(T).name()); } + template + void + add_multi_data(const T* array, uintptr_t len) { + assert(!finished_); + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_bools(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_int8s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_int16s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_int32s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_int64s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_f32s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + tantivy_index_add_multi_f64s(writer_, array, len); + return; + } + + if constexpr (std::is_same_v) { + std::vector views; + for (uintptr_t i = 0; i < len; i++) { + views.push_back(array[i].c_str()); + } + tantivy_index_add_multi_keywords(writer_, views.data(), len); + return; + } + + throw fmt::format( + "InvertedIndex.add_multi_data: unsupported data type: {}", + typeid(T).name()); + } + inline void finish() { if (!finished_) { diff --git a/internal/core/thirdparty/tantivy/test.cpp b/internal/core/thirdparty/tantivy/test.cpp index 1c67a69673a5c..602ea3449f0a2 100644 --- a/internal/core/thirdparty/tantivy/test.cpp +++ b/internal/core/thirdparty/tantivy/test.cpp @@ -200,6 +200,77 @@ test_32717() { } } +template +std::map> +build_inverted_index(const std::vector>& vec_of_array) { + std::map> inverted_index; + for (uint32_t i = 0; i < vec_of_array.size(); i++) { + for (const auto& term : vec_of_array[i]) { + inverted_index[term].insert(i); + } + } + return inverted_index; +} + +void +test_array_int() { + using T = int64_t; + + auto path = "/tmp/inverted-index/test-binding/"; + boost::filesystem::remove_all(path); + boost::filesystem::create_directories(path); + auto w = TantivyIndexWrapper("test_field_name", guess_data_type(), path); + + std::vector> vec_of_array{ + {10, 40, 50}, + {20, 50}, + {10, 50, 60}, + }; + + for (const auto& arr : vec_of_array) { + w.add_multi_data(arr.data(), arr.size()); + } + w.finish(); + + assert(w.count() == vec_of_array.size()); + + auto inverted_index = build_inverted_index(vec_of_array); + for (const auto& [term, posting_list] : inverted_index) { + auto hits = w.term_query(term).to_set(); + assert(posting_list == hits); + } +} + +void +test_array_string() { + using T = std::string; + + auto path = "/tmp/inverted-index/test-binding/"; + boost::filesystem::remove_all(path); + boost::filesystem::create_directories(path); + auto w = + TantivyIndexWrapper("test_field_name", TantivyDataType::Keyword, path); + + std::vector> vec_of_array{ + {"10", "40", "50"}, + {"20", "50"}, + {"10", "50", "60"}, + }; + + for (const auto& arr : vec_of_array) { + w.add_multi_data(arr.data(), arr.size()); + } + w.finish(); + + assert(w.count() == vec_of_array.size()); + + auto inverted_index = build_inverted_index(vec_of_array); + for (const auto& [term, posting_list] : inverted_index) { + auto hits = w.term_query(term).to_set(); + assert(posting_list == hits); + } +} + int main(int argc, char* argv[]) { test_32717(); @@ -216,5 +287,8 @@ main(int argc, char* argv[]) { run(); + test_array_int(); + test_array_string(); + return 0; } diff --git a/internal/core/unittest/test_inverted_index.cpp b/internal/core/unittest/test_inverted_index.cpp index eeddfe6e9d81a..d01813ab94e6a 100644 --- a/internal/core/unittest/test_inverted_index.cpp +++ b/internal/core/unittest/test_inverted_index.cpp @@ -32,13 +32,20 @@ auto gen_field_meta(int64_t collection_id = 1, int64_t partition_id = 2, int64_t segment_id = 3, - int64_t field_id = 101) -> storage::FieldDataMeta { - return storage::FieldDataMeta{ + int64_t field_id = 101, + DataType data_type = DataType::NONE, + DataType element_type = DataType::NONE) + -> storage::FieldDataMeta { + auto meta = storage::FieldDataMeta{ .collection_id = collection_id, .partition_id = partition_id, .segment_id = segment_id, .field_id = field_id, }; + meta.schema.set_data_type(static_cast(data_type)); + meta.schema.set_element_type( + static_cast(element_type)); + return meta; } auto @@ -86,7 +93,7 @@ struct ChunkManagerWrapper { }; } // namespace milvus::test -template +template void test_run() { int64_t collection_id = 1; @@ -96,8 +103,8 @@ test_run() { int64_t index_build_id = 1000; int64_t index_version = 10000; - auto field_meta = - test::gen_field_meta(collection_id, partition_id, segment_id, field_id); + auto field_meta = test::gen_field_meta( + collection_id, partition_id, segment_id, field_id, dtype, element_type); auto index_meta = test::gen_index_meta( segment_id, field_id, index_build_id, index_version); @@ -305,8 +312,12 @@ test_string() { int64_t index_build_id = 1000; int64_t index_version = 10000; - auto field_meta = - test::gen_field_meta(collection_id, partition_id, segment_id, field_id); + auto field_meta = test::gen_field_meta(collection_id, + partition_id, + segment_id, + field_id, + dtype, + DataType::NONE); auto index_meta = test::gen_index_meta( segment_id, field_id, index_build_id, index_version); diff --git a/internal/core/unittest/test_scalar_index.cpp b/internal/core/unittest/test_scalar_index.cpp index 2967523daf365..9a99bec26a272 100644 --- a/internal/core/unittest/test_scalar_index.cpp +++ b/internal/core/unittest/test_scalar_index.cpp @@ -53,6 +53,14 @@ TYPED_TEST_P(TypedScalarIndexTest, Dummy) { std::cout << milvus::GetDType() << std::endl; } +auto +GetTempFileManagerCtx(CDataType data_type) { + auto ctx = milvus::storage::FileManagerContext(); + ctx.fieldDataMeta.schema.set_data_type( + static_cast(data_type)); + return ctx; +} + TYPED_TEST_P(TypedScalarIndexTest, Constructor) { using T = TypeParam; auto dtype = milvus::GetDType(); @@ -63,7 +71,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Constructor) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); } } @@ -77,7 +85,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Count) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -96,7 +104,7 @@ TYPED_TEST_P(TypedScalarIndexTest, HasRawData) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -116,7 +124,7 @@ TYPED_TEST_P(TypedScalarIndexTest, In) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -135,7 +143,7 @@ TYPED_TEST_P(TypedScalarIndexTest, NotIn) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -154,7 +162,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Reverse) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -173,7 +181,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Range) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -192,7 +200,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Codec) { create_index_info.index_type = index_type; auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); auto scalar_index = dynamic_cast*>(index.get()); auto arr = GenSortedArr(nb); @@ -201,7 +209,7 @@ TYPED_TEST_P(TypedScalarIndexTest, Codec) { auto binary_set = index->Serialize(nullptr); auto copy_index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( - create_index_info); + create_index_info, GetTempFileManagerCtx(dtype)); copy_index->Load(binary_set); auto copy_scalar_index = @@ -372,6 +380,8 @@ TYPED_TEST_P(TypedScalarIndexTestV2, Base) { auto space = TestSpace(temp_path, vec_size, dataset, scalars); milvus::storage::FileManagerContext file_manager_context( {}, {.field_name = "scalar"}, chunk_manager, space); + file_manager_context.fieldDataMeta.schema.set_data_type( + static_cast(dtype)); auto index = milvus::index::IndexFactory::GetInstance().CreateScalarIndex( create_index_info, file_manager_context, space); diff --git a/internal/proto/cgo_msg.proto b/internal/proto/cgo_msg.proto new file mode 100644 index 0000000000000..6d851e95e0550 --- /dev/null +++ b/internal/proto/cgo_msg.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package milvus.proto.cgo; +option go_package="github.com/milvus-io/milvus/internal/proto/cgopb"; + +import "schema.proto"; + +message LoadIndexInfo { + int64 collectionID = 1; + int64 partitionID = 2; + int64 segmentID = 3; + schema.FieldSchema field = 5; + bool enable_mmap = 6; + string mmap_dir_path = 7; + int64 indexID = 8; + int64 index_buildID = 9; + int64 index_version = 10; + map index_params = 11; + repeated string index_files = 12; + string uri = 13; + int64 index_store_version = 14; + int32 index_engine_version = 15; +} diff --git a/internal/querynodev2/segments/load_index_info.go b/internal/querynodev2/segments/load_index_info.go index c5c1572475c40..04632bed95f2d 100644 --- a/internal/querynodev2/segments/load_index_info.go +++ b/internal/querynodev2/segments/load_index_info.go @@ -29,11 +29,13 @@ import ( "runtime" "unsafe" + "github.com/golang/protobuf/proto" "github.com/pingcap/log" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord" + "github.com/milvus-io/milvus/internal/proto/cgopb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/pkg/common" @@ -245,3 +247,33 @@ func (li *LoadIndexInfo) appendIndexEngineVersion(ctx context.Context, indexEngi return HandleCStatus(ctx, &status, "AppendIndexEngineVersion failed") } + +func (li *LoadIndexInfo) finish(ctx context.Context, info *cgopb.LoadIndexInfo) error { + marshaled, err := proto.Marshal(info) + if err != nil { + return err + } + + var status C.CStatus + _, _ = GetDynamicPool().Submit(func() (any, error) { + status = C.FinishLoadIndexInfo(li.cLoadIndexInfo, (*C.uint8_t)(unsafe.Pointer(&marshaled[0])), (C.uint64_t)(len(marshaled))) + return nil, nil + }).Await() + + if err := HandleCStatus(ctx, &status, "FinishLoadIndexInfo failed"); err != nil { + return err + } + + _, _ = GetLoadPool().Submit(func() (any, error) { + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { + status = C.AppendIndexV3(li.cLoadIndexInfo) + } else { + traceCtx := ParseCTraceContext(ctx) + status = C.AppendIndexV2(traceCtx.ctx, li.cLoadIndexInfo) + runtime.KeepAlive(traceCtx) + } + return nil, nil + }).Await() + + return HandleCStatus(ctx, &status, "AppendIndex failed") +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 13ae49c91aa11..3382b4373a364 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" milvus_storage "github.com/milvus-io/milvus-storage/go/storage" "github.com/milvus-io/milvus-storage/go/storage/options" + "github.com/milvus-io/milvus/internal/proto/cgopb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" @@ -56,6 +57,9 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" + "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -1266,18 +1270,58 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn return err } defer deleteLoadIndexInfo(loadIndexInfo) + + schema, err := typeutil.CreateSchemaHelper(s.GetCollection().Schema()) + if err != nil { + return err + } + fieldSchema, err := schema.GetFieldFromID(indexInfo.GetFieldID()) + if err != nil { + return err + } + + indexParams := funcutil.KeyValuePair2Map(indexInfo.IndexParams) + // as Knowhere reports error if encounter an unknown param, we need to delete it + delete(indexParams, common.MmapEnabledKey) + + // some build params also exist in indexParams, which are useless during loading process + if indexParams["index_type"] == indexparamcheck.IndexDISKANN { + if err := indexparams.SetDiskIndexLoadParams(paramtable.Get(), indexParams, indexInfo.GetNumRows()); err != nil { + return err + } + } + + if err := indexparams.AppendPrepareLoadParams(paramtable.Get(), indexParams); err != nil { + return err + } + + indexInfoProto := &cgopb.LoadIndexInfo{ + CollectionID: s.Collection(), + PartitionID: s.Partition(), + SegmentID: s.ID(), + Field: fieldSchema, + EnableMmap: isIndexMmapEnable(indexInfo), + MmapDirPath: paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue(), + IndexID: indexInfo.GetIndexID(), + IndexBuildID: indexInfo.GetBuildID(), + IndexVersion: indexInfo.GetIndexVersion(), + IndexParams: indexParams, + IndexFiles: indexInfo.GetIndexFilePaths(), + IndexEngineVersion: indexInfo.GetCurrentIndexVersion(), + IndexStoreVersion: indexInfo.GetIndexStoreVersion(), + } + if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { uri, err := typeutil_internal.GetStorageURI(paramtable.Get().CommonCfg.StorageScheme.GetValue(), paramtable.Get().CommonCfg.StoragePathPrefix.GetValue(), s.ID()) if err != nil { return err } - loadIndexInfo.appendStorageInfo(uri, indexInfo.IndexStoreVersion) + indexInfoProto.Uri = uri } newLoadIndexInfoSpan := tr.RecordSpan() // 2. - err = loadIndexInfo.appendLoadIndexInfo(ctx, indexInfo, s.Collection(), s.Partition(), s.ID(), fieldType) - if err != nil { + if err := loadIndexInfo.finish(ctx, indexInfoProto); err != nil { if loadIndexInfo.cleanLocalData(ctx) != nil { log.Warn("failed to clean cached data on disk after append index failed", zap.Int64("buildID", indexInfo.BuildID), diff --git a/pkg/util/indexparamcheck/inverted_checker.go b/pkg/util/indexparamcheck/inverted_checker.go index b15549cd4b7a6..dfc24127d3569 100644 --- a/pkg/util/indexparamcheck/inverted_checker.go +++ b/pkg/util/indexparamcheck/inverted_checker.go @@ -17,7 +17,8 @@ func (c *INVERTEDChecker) CheckTrain(params map[string]string) error { } func (c *INVERTEDChecker) CheckValidDataType(dType schemapb.DataType) error { - if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) { + if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) && + !typeutil.IsArrayType(dType) { return fmt.Errorf("INVERTED are not supported on %s field", dType.String()) } return nil diff --git a/pkg/util/indexparamcheck/inverted_checker_test.go b/pkg/util/indexparamcheck/inverted_checker_test.go index afe41f89f1193..7a31290061490 100644 --- a/pkg/util/indexparamcheck/inverted_checker_test.go +++ b/pkg/util/indexparamcheck/inverted_checker_test.go @@ -18,8 +18,8 @@ func Test_INVERTEDIndexChecker(t *testing.T) { assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Bool)) assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Int64)) assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Float)) + assert.NoError(t, c.CheckValidDataType(schemapb.DataType_Array)) assert.Error(t, c.CheckValidDataType(schemapb.DataType_JSON)) - assert.Error(t, c.CheckValidDataType(schemapb.DataType_Array)) assert.Error(t, c.CheckValidDataType(schemapb.DataType_FloatVector)) } diff --git a/scripts/generate_proto.sh b/scripts/generate_proto.sh index 2551f586c9f9c..286570b842aa8 100755 --- a/scripts/generate_proto.sh +++ b/scripts/generate_proto.sh @@ -44,6 +44,7 @@ pushd ${PROTO_DIR} mkdir -p etcdpb mkdir -p indexcgopb +mkdir -p cgopb mkdir -p internalpb mkdir -p rootcoordpb @@ -62,6 +63,7 @@ protoc_opt="${PROTOC_BIN} --proto_path=${API_PROTO_DIR} --proto_path=." ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./etcdpb etcd_meta.proto || { echo 'generate etcd_meta.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./indexcgopb index_cgo_msg.proto || { echo 'generate index_cgo_msg failed '; exit 1; } +${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./cgopb cgo_msg.proto || { echo 'generate cgo_msg failed '; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./rootcoordpb root_coord.proto || { echo 'generate root_coord.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./internalpb internal.proto || { echo 'generate internal.proto failed'; exit 1; } ${protoc_opt} --go_out=plugins=grpc,paths=source_relative:./proxypb proxy.proto|| { echo 'generate proxy.proto failed'; exit 1; } @@ -78,6 +80,7 @@ ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb schema.proto|| { echo 'generate sche ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb common.proto|| { echo 'generate common.proto failed'; exit 1; } ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb segcore.proto|| { echo 'generate segcore.proto failed'; exit 1; } ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb index_cgo_msg.proto|| { echo 'generate index_cgo_msg.proto failed'; exit 1; } +${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb cgo_msg.proto|| { echo 'generate cgo_msg.proto failed'; exit 1; } ${protoc_opt} --cpp_out=$CPP_SRC_DIR/src/pb plan.proto|| { echo 'generate plan.proto failed'; exit 1; } popd diff --git a/tests/python_client/testcases/test_index.py b/tests/python_client/testcases/test_index.py index 017ab6ff034db..753fb28cd5b5e 100644 --- a/tests/python_client/testcases/test_index.py +++ b/tests/python_client/testcases/test_index.py @@ -1309,10 +1309,7 @@ def test_create_inverted_index_on_array_field(self): collection_w = self.init_collection_wrap(schema=schema) # 2. create index scalar_index_params = {"index_type": "INVERTED"} - collection_w.create_index(ct.default_int32_array_field_name, index_params=scalar_index_params, - check_task=CheckTasks.err_res, - check_items={ct.err_code: 1100, - ct.err_msg: "create index on Array field is not supported"}) + collection_w.create_index(ct.default_int32_array_field_name, index_params=scalar_index_params) @pytest.mark.tags(CaseLabel.L1) def test_create_inverted_index_no_vector_index(self):