Skip to content

Commit

Permalink
enhance: speed up array-equal operator via inverted index (#33633)
Browse files Browse the repository at this point in the history
Signed-off-by: longjiquan <[email protected]>
  • Loading branch information
longjiquan committed Jun 21, 2024
1 parent afd767a commit 869c93d
Show file tree
Hide file tree
Showing 14 changed files with 614 additions and 21 deletions.
9 changes: 9 additions & 0 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class Schema {
return field_id;
}

FieldId
AddDebugArrayField(const std::string& name, DataType element_type) {
auto field_id = FieldId(debug_id);
debug_id++;
this->AddField(
FieldName(name), field_id, DataType::ARRAY, element_type);
return field_id;
}

// auto gen field_id for convenience
FieldId
AddDebugField(const std::string& name,
Expand Down
16 changes: 16 additions & 0 deletions internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,22 @@ class SegmentExpr : public Expr {
return result;
}

template <typename T, typename FUNC, typename... ValTypes>
void
ProcessIndexChunksV2(FUNC func, ValTypes... values) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;

for (size_t i = current_index_chunk_; i < num_index_chunk_; i++) {
const Index& index =
segment_->chunk_scalar_index<IndexInnerType>(field_id_, i);
auto* index_ptr = const_cast<Index*>(&index);
func(index_ptr, values...);
}
}

template <typename T>
bool
CanUseIndex(OpType op) const {
Expand Down
166 changes: 165 additions & 1 deletion internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,66 @@
namespace milvus {
namespace exec {

template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex() {
return ExecRangeVisitorImplArray<T>();
}

template <>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>() {
switch (expr_->op_type_) {
case proto::plan::Equal:
case proto::plan::NotEqual: {
switch (expr_->column_.element_type_) {
case DataType::BOOL: {
return ExecArrayEqualForIndex<bool>(expr_->op_type_ ==
proto::plan::NotEqual);
}
case DataType::INT8: {
return ExecArrayEqualForIndex<int8_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT16: {
return ExecArrayEqualForIndex<int16_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT32: {
return ExecArrayEqualForIndex<int32_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::INT64: {
return ExecArrayEqualForIndex<int64_t>(
expr_->op_type_ == proto::plan::NotEqual);
}
case DataType::FLOAT:
case DataType::DOUBLE: {
// not accurate on floating point number, rollback to bruteforce.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
return ExecArrayEqualForIndex<std::string>(
expr_->op_type_ == proto::plan::NotEqual);
} else {
return ExecArrayEqualForIndex<std::string_view>(
expr_->op_type_ == proto::plan::NotEqual);
}
}
default:
PanicInfo(DataTypeInvalid,
"unsupported element type when execute array "
"equal for index: {}",
expr_->column_.element_type_);
}
}
default:
return ExecRangeVisitorImplArray<proto::plan::Array>();
}
}

void
PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
switch (expr_->column_.data_type_) {
Expand Down Expand Up @@ -99,7 +159,13 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
result = ExecRangeVisitorImplArray<std::string>();
break;
case proto::plan::GenericValue::ValCase::kArrayVal:
result = ExecRangeVisitorImplArray<proto::plan::Array>();
if (is_index_mode_) {
result = ExecRangeVisitorImplArrayForIndex<
proto::plan::Array>();
} else {
result =
ExecRangeVisitorImplArray<proto::plan::Array>();
}
break;
default:
PanicInfo(
Expand Down Expand Up @@ -196,6 +262,104 @@ PhyUnaryRangeFilterExpr::ExecRangeVisitorImplArray() {
return res_vec;
}

template <typename T>
VectorPtr
PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {
typedef std::
conditional_t<std::is_same_v<T, std::string_view>, std::string, T>
IndexInnerType;
using Index = index::ScalarIndex<IndexInnerType>;
auto real_batch_size = GetNextBatchSize();
if (real_batch_size == 0) {
return nullptr;
}

// get all elements.
auto val = GetValueFromProto<proto::plan::Array>(expr_->val_);
if (val.array_size() == 0) {
// rollback to bruteforce. no candidates will be filtered out via index.
return ExecRangeVisitorImplArray<proto::plan::Array>();
}

// cache the result to suit the framework.
auto batch_res =
ProcessIndexChunks<IndexInnerType>([this, &val, reverse](Index* _) {
boost::container::vector<IndexInnerType> elems;
for (auto const& element : val.array()) {
auto e = GetValueFromProto<IndexInnerType>(element);
if (std::find(elems.begin(), elems.end(), e) == elems.end()) {
elems.push_back(e);
}
}

// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
segment_->template chunk_data<milvus::ArrayView>(field_id_,
chunk_idx);
return chunk.data() + chunk_offset;
};

// compare the array via the raw data.
auto filter = [&retrieve, &val, reverse](size_t offset) -> bool {
auto data_ptr = retrieve(offset);
return data_ptr->is_same_array(val) ^ reverse;
};

// collect all candidates.
std::unordered_set<size_t> candidates;
std::unordered_set<size_t> tmp_candidates;
auto first_callback = [&candidates](size_t offset) -> void {
candidates.insert(offset);
};
auto callback = [&candidates,
&tmp_candidates](size_t offset) -> void {
if (candidates.find(offset) != candidates.end()) {
tmp_candidates.insert(offset);
}
};
auto execute_sub_batch =
[](Index* index_ptr,
const IndexInnerType& val,
const std::function<void(size_t /* offset */)>& callback) {
index_ptr->InApplyCallback(1, &val, callback);
};

// run in-filter.
for (size_t idx = 0; idx < elems.size(); idx++) {
if (idx == 0) {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], first_callback);
} else {
ProcessIndexChunksV2<IndexInnerType>(
execute_sub_batch, elems[idx], callback);
candidates = std::move(tmp_candidates);
}
// the size of candidates is small enough.
if (candidates.size() * 100 < active_count_) {
break;
}
}
TargetBitmap res(active_count_);
// run post-filter. The filter will only be executed once in the framework.
for (const auto& candidate : candidates) {
res[candidate] = filter(candidate);
}
return res;
});
AssertInfo(batch_res.size() == real_batch_size,
"internal error: expr processed rows {} not equal "
"expect batch size {}",
batch_res.size(),
real_batch_size);

// return the result.
return std::make_shared<ColumnVector>(std::move(batch_res));
}

template <typename ExprValueType>
VectorPtr
PhyUnaryRangeFilterExpr::ExecRangeVisitorImplJson() {
Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/exec/expression/UnaryExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,14 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr {
VectorPtr
ExecRangeVisitorImplArray();

template <typename T>
VectorPtr
ExecRangeVisitorImplArrayForIndex();

template <typename T>
VectorPtr
ExecArrayEqualForIndex(bool reverse);

// Check overflow and cache result for performace
template <typename T>
ColumnVectorPtr
Expand Down
54 changes: 53 additions & 1 deletion internal/core/src/index/InvertedIndexTantivy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,25 @@ apply_hits(TargetBitmap& bitset, const RustArrayWrapper& w, bool v) {
}
}

inline void
apply_hits_with_filter(TargetBitmap& bitset,
const RustArrayWrapper& w,
const std::function<bool(size_t /* offset */)>& filter) {
for (size_t j = 0; j < w.array_.len; j++) {
auto the_offset = w.array_.array[j];
bitset[the_offset] = filter(the_offset);
}
}

inline void
apply_hits_with_callback(
const RustArrayWrapper& w,
const std::function<void(size_t /* offset */)>& callback) {
for (size_t j = 0; j < w.array_.len; j++) {
callback(w.array_.array[j]);
}
}

template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::In(size_t n, const T* values) {
Expand All @@ -215,6 +234,28 @@ InvertedIndexTantivy<T>::In(size_t n, const T* values) {
return bitset;
}

template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::InApplyFilter(
size_t n, const T* values, const std::function<bool(size_t)>& filter) {
TargetBitmap bitset(Count());
for (size_t i = 0; i < n; ++i) {
auto array = wrapper_->term_query(values[i]);
apply_hits_with_filter(bitset, array, filter);
}
return bitset;
}

template <typename T>
void
InvertedIndexTantivy<T>::InApplyCallback(
size_t n, const T* values, const std::function<void(size_t)>& callback) {
for (size_t i = 0; i < n; ++i) {
auto array = wrapper_->term_query(values[i]);
apply_hits_with_callback(array, callback);
}
}

template <typename T>
const TargetBitmap
InvertedIndexTantivy<T>::NotIn(size_t n, const T* values) {
Expand Down Expand Up @@ -311,6 +352,9 @@ void
InvertedIndexTantivy<T>::BuildWithRawData(size_t n,
const void* values,
const Config& config) {
if constexpr (std::is_same_v<bool, T>) {
schema_.set_data_type(proto::schema::DataType::Bool);
}
if constexpr (std::is_same_v<int8_t, T>) {
schema_.set_data_type(proto::schema::DataType::Int8);
}
Expand Down Expand Up @@ -341,7 +385,15 @@ InvertedIndexTantivy<T>::BuildWithRawData(size_t n,
std::string field = "test_inverted_index";
wrapper_ = std::make_shared<TantivyIndexWrapper>(
field.c_str(), d_type_, path_.c_str());
wrapper_->add_data<T>(static_cast<const T*>(values), n);
if (config.find("is_array") != config.end()) {
// only used in ut.
auto arr = static_cast<const boost::container::vector<T>*>(values);
for (size_t i = 0; i < n; i++) {
wrapper_->template add_multi_data(arr[i].data(), arr[i].size());
}
} else {
wrapper_->add_data<T>(static_cast<const T*>(values), n);
}
finish();
}

Expand Down
12 changes: 12 additions & 0 deletions internal/core/src/index/InvertedIndexTantivy.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ class InvertedIndexTantivy : public ScalarIndex<T> {
const TargetBitmap
In(size_t n, const T* values) override;

const TargetBitmap
InApplyFilter(
size_t n,
const T* values,
const std::function<bool(size_t /* offset */)>& filter) override;

void
InApplyCallback(
size_t n,
const T* values,
const std::function<void(size_t /* offset */)>& callback) override;

const TargetBitmap
NotIn(size_t n, const T* values) override;

Expand Down
14 changes: 14 additions & 0 deletions internal/core/src/index/ScalarIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ class ScalarIndex : public IndexBase {
virtual const TargetBitmap
In(size_t n, const T* values) = 0;

virtual const TargetBitmap
InApplyFilter(size_t n,
const T* values,
const std::function<bool(size_t /* offset */)>& filter) {
PanicInfo(ErrorCode::Unsupported, "InApplyFilter is not implemented");
}

virtual void
InApplyCallback(size_t n,
const T* values,
const std::function<void(size_t /* offset */)>& callback) {
PanicInfo(ErrorCode::Unsupported, "InApplyCallback is not implemented");
}

virtual const TargetBitmap
NotIn(size_t n, const T* values) = 0;

Expand Down
9 changes: 0 additions & 9 deletions internal/core/thirdparty/tantivy/tantivy-wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ struct RustArrayWrapper {
std::cout << ss.str() << std::endl;
}

std::set<uint32_t>
to_set() {
std::set<uint32_t> s;
for (int i = 0; i < array_.len; i++) {
s.insert(array_.array[i]);
}
return s;
}

RustArray array_;

private:
Expand Down
Loading

0 comments on commit 869c93d

Please sign in to comment.