Skip to content

Commit

Permalink
feat: support inverted index for array (milvus-io#33452)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan committed Jun 21, 2024
1 parent d9539ff commit afd767a
Show file tree
Hide file tree
Showing 33 changed files with 879 additions and 343 deletions.
98 changes: 95 additions & 3 deletions internal/core/src/exec/expression/JsonContainsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ namespace exec {
void
PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
case DataType::ARRAY:
case DataType::ARRAY: {
if (is_index_mode_) {
result = EvalArrayContainsForIndexSegment();
} else {
result = EvalJsonContainsForDataSegment();
}
break;
}
case DataType::JSON: {
if (is_index_mode_) {
PanicInfo(
Expand Down Expand Up @@ -94,7 +101,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsWithDiffType();
}
}
break;
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
if (IsArrayDataType(data_type)) {
Expand Down Expand Up @@ -145,7 +151,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsAllWithDiffType();
}
}
break;
}
default:
PanicInfo(ExprInvalid,
Expand Down Expand Up @@ -748,5 +753,92 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType() {
return res_vec;
}

VectorPtr
PhyJsonContainsFilterExpr::EvalArrayContainsForIndexSegment() {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayContainsForIndexSegmentImpl<bool>();
}
case DataType::INT8: {
return ExecArrayContainsForIndexSegmentImpl<int8_t>();
}
case DataType::INT16: {
return ExecArrayContainsForIndexSegmentImpl<int16_t>();
}
case DataType::INT32: {
return ExecArrayContainsForIndexSegmentImpl<int32_t>();
}
case DataType::INT64: {
return ExecArrayContainsForIndexSegmentImpl<int64_t>();
}
case DataType::FLOAT: {
return ExecArrayContainsForIndexSegmentImpl<float>();
}
case DataType::DOUBLE: {
return ExecArrayContainsForIndexSegmentImpl<double>();
}
case DataType::VARCHAR:
case DataType::STRING: {
return ExecArrayContainsForIndexSegmentImpl<std::string>();
}
default:
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type for "
"ExecArrayContainsForIndexSegmentImpl: {}",
expr_->column_.element_type_));
}
}

template <typename ExprValueType>
VectorPtr
PhyJsonContainsFilterExpr::ExecArrayContainsForIndexSegmentImpl() {
typedef std::conditional_t<std::is_same_v<ExprValueType, std::string_view>,
std::string,
ExprValueType>
GetType;
using Index = index::ScalarIndex<GetType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}

std::unordered_set<GetType> elements;
for (auto const& element : expr_->vals_) {
elements.insert(GetValueFromProto<GetType>(element));
}
boost::container::vector<GetType> elems(elements.begin(), elements.end());
auto execute_sub_batch =
[this](Index* index_ptr,
const boost::container::vector<GetType>& vals) {
switch (expr_->op_) {
case proto::plan::JSONContainsExpr_JSONOp_Contains:
case proto::plan::JSONContainsExpr_JSONOp_ContainsAny: {
return index_ptr->In(vals.size(), vals.data());
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
TargetBitmap result(index_ptr->Count());
result.set();
for (size_t i = 0; i < vals.size(); i++) {
auto sub = index_ptr->In(1, &vals[i]);
result &= sub;
}
return result;
}
default:
PanicInfo(
ExprInvalid,
"unsupported array contains type {}",
proto::plan::JSONContainsExpr_JSONOp_Name(expr_->op_));
}
};
auto res = ProcessIndexChunks<GetType>(execute_sub_batch, elems);
AssertInfo(res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
res.size(),
real_batch_size);
return std::make_shared<ColumnVector>(std::move(res));
}

} //namespace exec
} // namespace milvus
7 changes: 7 additions & 0 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
VectorPtr
ExecJsonContainsWithDiffType();

VectorPtr
EvalArrayContainsForIndexSegment();

template <typename ExprValueType>
VectorPtr
ExecArrayContainsForIndexSegmentImpl();

private:
std::shared_ptr<const milvus::expr::JsonContainsExpr> expr_;
};
Expand Down
17 changes: 13 additions & 4 deletions internal/core/src/expr/ITypeExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ IsMaterializedViewSupported(const DataType& data_type) {
struct ColumnInfo {
FieldId field_id_;
DataType data_type_;
DataType element_type_;
std::vector<std::string> nested_path_;

ColumnInfo(const proto::plan::ColumnInfo& column_info)
: field_id_(column_info.field_id()),
data_type_(static_cast<DataType>(column_info.data_type())),
element_type_(static_cast<DataType>(column_info.element_type())),
nested_path_(column_info.nested_path().begin(),
column_info.nested_path().end()) {
}
Expand All @@ -127,6 +129,7 @@ struct ColumnInfo {
std::vector<std::string> nested_path = {})
: field_id_(field_id),
data_type_(data_type),
element_type_(DataType::NONE),
nested_path_(std::move(nested_path)) {
}

Expand All @@ -140,6 +143,10 @@ struct ColumnInfo {
return false;
}

if (element_type_ != other.element_type_) {
return false;
}

for (int i = 0; i < nested_path_.size(); ++i) {
if (nested_path_[i] != other.nested_path_[i]) {
return false;
Expand All @@ -151,10 +158,12 @@ struct ColumnInfo {

std::string
ToString() const {
return fmt::format("[FieldId:{}, data_type:{}, nested_path:{}]",
std::to_string(field_id_.get()),
data_type_,
milvus::Join<std::string>(nested_path_, ","));
return fmt::format(
"[FieldId:{}, data_type:{}, element_type:{}, nested_path:{}]",
std::to_string(field_id_.get()),
data_type_,
element_type_,
milvus::Join<std::string>(nested_path_, ","));
}
};

Expand Down
105 changes: 47 additions & 58 deletions internal/core/src/index/IndexFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,9 @@ template <typename T>
ScalarIndexPtr<T>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
DataType d_type) {
const storage::FileManagerContext& file_manager_context) {
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<T>>(cfg,
file_manager_context);
return std::make_unique<InvertedIndexTantivy<T>>(file_manager_context);
}
return CreateScalarIndexSort<T>(file_manager_context);
}
Expand All @@ -56,14 +52,11 @@ template <>
ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex<std::string>(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
DataType d_type) {
const storage::FileManagerContext& file_manager_context) {
#if defined(__linux__) || defined(__APPLE__)
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<std::string>>(
cfg, file_manager_context);
file_manager_context);
}
return CreateStringIndexMarisa(file_manager_context);
#else
Expand All @@ -76,13 +69,10 @@ ScalarIndexPtr<T>
IndexFactory::CreateScalarIndex(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type) {
std::shared_ptr<milvus_storage::Space> space) {
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<T>>(
cfg, file_manager_context, space);
return std::make_unique<InvertedIndexTantivy<T>>(file_manager_context,
space);
}
return CreateScalarIndexSort<T>(file_manager_context, space);
}
Expand All @@ -92,14 +82,11 @@ ScalarIndexPtr<std::string>
IndexFactory::CreateScalarIndex<std::string>(
const IndexType& index_type,
const storage::FileManagerContext& file_manager_context,
std::shared_ptr<milvus_storage::Space> space,
DataType d_type) {
std::shared_ptr<milvus_storage::Space> space) {
#if defined(__linux__) || defined(__APPLE__)
if (index_type == INVERTED_INDEX_TYPE) {
TantivyConfig cfg;
cfg.data_type_ = d_type;
return std::make_unique<InvertedIndexTantivy<std::string>>(
cfg, file_manager_context, space);
file_manager_context, space);
}
return CreateStringIndexMarisa(file_manager_context, space);
#else
Expand Down Expand Up @@ -132,48 +119,57 @@ IndexFactory::CreateIndex(
}

IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
IndexFactory::CreatePrimitiveScalarIndex(
DataType data_type,
IndexType index_type,
const storage::FileManagerContext& file_manager_context) {
auto data_type = create_index_info.field_type;
auto index_type = create_index_info.index_type;

switch (data_type) {
// create scalar index
case DataType::BOOL:
return CreateScalarIndex<bool>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<bool>(index_type, file_manager_context);
case DataType::INT8:
return CreateScalarIndex<int8_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int8_t>(index_type, file_manager_context);
case DataType::INT16:
return CreateScalarIndex<int16_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int16_t>(index_type, file_manager_context);
case DataType::INT32:
return CreateScalarIndex<int32_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int32_t>(index_type, file_manager_context);
case DataType::INT64:
return CreateScalarIndex<int64_t>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<int64_t>(index_type, file_manager_context);
case DataType::FLOAT:
return CreateScalarIndex<float>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<float>(index_type, file_manager_context);
case DataType::DOUBLE:
return CreateScalarIndex<double>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<double>(index_type, file_manager_context);

// create string index
case DataType::STRING:
case DataType::VARCHAR:
return CreateScalarIndex<std::string>(
index_type, file_manager_context, data_type);
return CreateScalarIndex<std::string>(index_type,
file_manager_context);
default:
throw SegcoreError(
DataTypeInvalid,
fmt::format("invalid data type to build index: {}", data_type));
}
}

IndexBasePtr
IndexFactory::CreateScalarIndex(
const CreateIndexInfo& create_index_info,
const storage::FileManagerContext& file_manager_context) {
switch (create_index_info.field_type) {
case DataType::ARRAY:
return CreatePrimitiveScalarIndex(
static_cast<DataType>(
file_manager_context.fieldDataMeta.schema.element_type()),
create_index_info.index_type,
file_manager_context);
default:
return CreatePrimitiveScalarIndex(create_index_info.field_type,
create_index_info.index_type,
file_manager_context);
}
}

IndexBasePtr
IndexFactory::CreateVectorIndex(
const CreateIndexInfo& create_index_info,
Expand Down Expand Up @@ -249,32 +245,25 @@ IndexFactory::CreateScalarIndex(const CreateIndexInfo& create_index_info,
switch (data_type) {
// create scalar index
case DataType::BOOL:
return CreateScalarIndex<bool>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<bool>(index_type, file_manager, space);
case DataType::INT8:
return CreateScalarIndex<int8_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int8_t>(index_type, file_manager, space);
case DataType::INT16:
return CreateScalarIndex<int16_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int16_t>(index_type, file_manager, space);
case DataType::INT32:
return CreateScalarIndex<int32_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int32_t>(index_type, file_manager, space);
case DataType::INT64:
return CreateScalarIndex<int64_t>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<int64_t>(index_type, file_manager, space);
case DataType::FLOAT:
return CreateScalarIndex<float>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<float>(index_type, file_manager, space);
case DataType::DOUBLE:
return CreateScalarIndex<double>(
index_type, file_manager, space, data_type);
return CreateScalarIndex<double>(index_type, file_manager, space);

// create string index
case DataType::STRING:
case DataType::VARCHAR:
return CreateScalarIndex<std::string>(
index_type, file_manager, space, data_type);
index_type, file_manager, space);
default:
throw SegcoreError(
DataTypeInvalid,
Expand Down
Loading

0 comments on commit afd767a

Please sign in to comment.