Skip to content

Commit

Permalink
feat: support inverted index for array (#33452) (#34053)
Browse files Browse the repository at this point in the history
pr: #33184
pr: #33452
pr: #33633
issue: #27704
Co-authored-by: xiaocai2333 <[email protected]>

---------

Signed-off-by: Cai Zhang <[email protected]>
Signed-off-by: longjiquan <[email protected]>
Co-authored-by: cai.zhang <[email protected]>
  • Loading branch information
longjiquan and xiaocai2333 authored Jun 24, 2024
1 parent 630a726 commit 22e6807
Show file tree
Hide file tree
Showing 54 changed files with 1,980 additions and 560 deletions.
9 changes: 9 additions & 0 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class Schema {
return field_id;
}

FieldId
AddDebugArrayField(const std::string& name, DataType element_type) {
auto field_id = FieldId(debug_id);
debug_id++;
this->AddField(
FieldName(name), field_id, DataType::ARRAY, element_type);
return field_id;
}

// auto gen field_id for convenience
FieldId
AddDebugField(const std::string& name,
Expand Down
16 changes: 16 additions & 0 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,22 @@ class SegmentExpr : public Expr {
return result;
}

template <typename T, typename FUNC, typename... ValTypes>
void
ProcessIndexChunksV2(FUNC func, ValTypes... values) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;

for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
func(index_ptr, values...);
}
}

template <typename T>
bool
CanUseIndex(OpType op) const {
Expand Down
98 changes: 95 additions & 3 deletions internal/core/src/exec/expression/JsonContainsExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@ namespace exec {
void
PhyJsonContainsFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
case DataType::ARRAY:
case DataType::ARRAY: {
if (is_index_mode_) {
result = EvalArrayContainsForIndexSegment();
} else {
result = EvalJsonContainsForDataSegment();
}
break;
}
case DataType::JSON: {
if (is_index_mode_) {
PanicInfo(
Expand Down Expand Up @@ -94,7 +101,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsWithDiffType();
}
}
break;
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
if (IsArrayDataType(data_type)) {
Expand Down Expand Up @@ -145,7 +151,6 @@ PhyJsonContainsFilterExpr::EvalJsonContainsForDataSegment() {
return ExecJsonContainsAllWithDiffType();
}
}
break;
}
default:
PanicInfo(ExprInvalid,
Expand Down Expand Up @@ -748,5 +753,92 @@ PhyJsonContainsFilterExpr::ExecJsonContainsWithDiffType() {
return res_vec;
}

VectorPtr
PhyJsonContainsFilterExpr::EvalArrayContainsForIndexSegment() {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayContainsForIndexSegmentImpl<bool>();
}
case DataType::INT8: {
return ExecArrayContainsForIndexSegmentImpl<int8_t>();
}
case DataType::INT16: {
return ExecArrayContainsForIndexSegmentImpl<int16_t>();
}
case DataType::INT32: {
return ExecArrayContainsForIndexSegmentImpl<int32_t>();
}
case DataType::INT64: {
return ExecArrayContainsForIndexSegmentImpl<int64_t>();
}
case DataType::FLOAT: {
return ExecArrayContainsForIndexSegmentImpl<float>();
}
case DataType::DOUBLE: {
return ExecArrayContainsForIndexSegmentImpl<double>();
}
case DataType::VARCHAR:
case DataType::STRING: {
return ExecArrayContainsForIndexSegmentImpl<std::string>();
}
default:
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type for "
"ExecArrayContainsForIndexSegmentImpl: {}",
expr_->column_.element_type_));
}
}

template <typename ExprValueType>
VectorPtr
PhyJsonContainsFilterExpr::ExecArrayContainsForIndexSegmentImpl() {
typedef std::conditional_t<std::is_same_v<ExprValueType, std::string_view>,
std::string,
ExprValueType>
GetType;
using Index = index::ScalarIndex<GetType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}

std::unordered_set<GetType> elements;
for (auto const& element : expr_->vals_) {
elements.insert(GetValueFromProto<GetType>(element));
}
boost::container::vector<GetType> elems(elements.begin(), elements.end());
auto execute_sub_batch =
[this](Index* index_ptr,
const boost::container::vector<GetType>& vals) {
switch (expr_->op_) {
case proto::plan::JSONContainsExpr_JSONOp_Contains:
case proto::plan::JSONContainsExpr_JSONOp_ContainsAny: {
return index_ptr->In(vals.size(), vals.data());
}
case proto::plan::JSONContainsExpr_JSONOp_ContainsAll: {
TargetBitmap result(index_ptr->Count());
result.set();
for (size_t i = 0; i < vals.size(); i++) {
auto sub = index_ptr->In(1, &vals[i]);
result &= sub;
}
return result;
}
default:
PanicInfo(
ExprInvalid,
"unsupported array contains type {}",
proto::plan::JSONContainsExpr_JSONOp_Name(expr_->op_));
}
};
auto res = ProcessIndexChunks<GetType>(execute_sub_batch, elems);
AssertInfo(res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
res.size(),
real_batch_size);
return std::make_shared<ColumnVector>(std::move(res));
}

} //namespace exec
} // namespace milvus
7 changes: 7 additions & 0 deletions internal/core/src/exec/expression/JsonContainsExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ class PhyJsonContainsFilterExpr : public SegmentExpr {
VectorPtr
ExecJsonContainsWithDiffType();

VectorPtr
EvalArrayContainsForIndexSegment();

template <typename ExprValueType>
VectorPtr
ExecArrayContainsForIndexSegmentImpl();

private:
std::shared_ptr<const milvus::expr::JsonContainsExpr> expr_;
};
Expand Down
166 changes: 165 additions & 1 deletion internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,66 @@
namespace milvus {
namespace exec {

template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex() {
return ExecRangeVisitorImplArray<T>();
}

template <>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>() {
switch (expr_->op_type_) {
case proto::plan::Equal:
case proto::plan::NotEqual: {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayEqualForIndex<bool>(expr_->op_type_ ==
proto::plan::NotEqual);
}
case DataType::INT8: {
return ExecArrayEqualForIndex<int8_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT16: {
return ExecArrayEqualForIndex<int16_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT32: {
return ExecArrayEqualForIndex<int32_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT64: {
return ExecArrayEqualForIndex<int64_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::FLOAT:
case DataType::DOUBLE: {
// not accurate on floating point number, rollback to bruteforce.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
return ExecArrayEqualForIndex<std::string>(
expr_->op_type_ == proto::plan::NotEqual);
} else {
return ExecArrayEqualForIndex<std::string_view>(
expr_->op_type_ == proto::plan::NotEqual);
}
}
default:
PanicInfo(DataTypeInvalid,
"unsupported element type when execute array "
"equal for index: {}",
expr_->column_.element_type_);
}
}
default:
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
}

void
PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
Expand Down Expand Up @@ -99,7 +159,13 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
result = ExecRangeVisitorImplArray<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplArray<proto::plan::Array>();
if (is_index_mode_) {
result = ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>();
} else {
result =
ExecRangeVisitorImplArray<proto::plan::Array>();
}
break;
default:
PanicInfo(
Expand Down Expand Up @@ -196,6 +262,104 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray() {
return res_vec;
}

template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}

// get all elements.
auto val = GetValueFromProto<proto::plan::Array>(expr_->val_);
if (val.array_size() == 0) {
// rollback to bruteforce. no candidates will be filtered out via index.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}

// cache the result to suit the framework.
auto batch_res =
ProcessIndexChunks<IndexInnerType>([this, &val, reverse](Index* _) {
boost::container::vector<IndexInnerType> elems;
for (auto const& element : val.array()) {
auto e = GetValueFromProto<IndexInnerType>(element);
if (std::find(elems.begin(), elems.end(), e) == elems.end()) {
elems.push_back(e);
}
}

// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(field_id_,
chunk_idx);
return chunk.data() + chunk_offset;
};

// compare the array via the raw data.
auto filter = [&retrieve, &val, reverse](size_t offset) -> bool {
auto data_ptr = retrieve(offset);
return data_ptr->is_same_array(val) ^ reverse;
};

// collect all candidates.
std::unordered_set<size_t> candidates;
std::unordered_set<size_t> tmp_candidates;
auto first_callback = [&candidates](size_t offset) -> void {
candidates.insert(offset);
};
auto callback = [&candidates,
&tmp_candidates](size_t offset) -> void {
if (candidates.find(offset) != candidates.end()) {
tmp_candidates.insert(offset);
}
};
auto execute_sub_batch =
[](Index* index_ptr,
const IndexInnerType& val,
const std::function<void(size_t /* offset */)>& callback) {
index_ptr->InApplyCallback(1, &val, callback);
};

// run in-filter.
for (size_t idx = 0; idx < elems.size(); idx++) {
if (idx == 0) {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], first_callback);
} else {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], callback);
candidates = std::move(tmp_candidates);
}
// the size of candidates is small enough.
if (candidates.size() * 100 < active_count_) {
break;
}
}
TargetBitmap res(active_count_);
// run post-filter. The filter will only be executed once in the framework.
for (const auto& candidate : candidates) {
res[candidate] = filter(candidate);
}
return res;
});
AssertInfo(batch_res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
batch_res.size(),
real_batch_size);

// return the result.
return std::make_shared<ColumnVector>(std::move(batch_res));
}

template <typename ExprValueType>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson() {
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
VectorPtr
ExecRangeVisitorImplArray();

template <typename T>
VectorPtr
ExecRangeVisitorImplArrayForIndex();

template <typename T>
VectorPtr
ExecArrayEqualForIndex(bool reverse);

// Check overflow and cache result for performace
template <typename T>
ColumnVectorPtr
Expand Down
Loading

0 comments on commit 22e6807

Please sign in to comment.