Skip to content

Commit

Permalink
feat: Implement custom function module in milvus expr
Browse files Browse the repository at this point in the history
OSPP 2024 project: https://summer-ospp.ac.cn/org/prodetail/247410235?list=org&navpage=org

Solutions:

- parser (planparserv2)
    - add CallExpr in planparserv2/Plan.g4
    - update parser_visitor and show_visitor
- grpc protobuf
    - add CallExpr in plan.proto
- execution (`core/src/exec`)
    - add `CallExpr` `ValueExpr` and `ColumnExpr` (both logical and
      physical) for function call and function parameters
- function factory (`core/src/exec/expression/function`)
    - create a global hashmap when starting milvus (see server.go)
    - the global hashmap stores function signatures and their function
      pointers, the CallExpr in execution engine can get the function pointer by
      function signature.
- custom functions
    - empty(string)
- add cpp/go unittests and E2E tests

closes: #36559

Signed-off-by: Yinzuo Jiang <[email protected]>
  • Loading branch information
jiangyinzuo committed Oct 13, 2024
1 parent d230b91 commit 12824a8
Show file tree
Hide file tree
Showing 40 changed files with 2,147 additions and 258 deletions.
13 changes: 4 additions & 9 deletions docs/design_docs/segcore/visitor.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
# Visitor Pattern
Visitor Pattern is used in segcore for parse and execute Execution Plan.

1. Inside `${core}/src/query/PlanNode.h`, contains physical plan for vector search:
1. Inside `${internal/core}/src/query/PlanNode.h`, contains physical plan for vector search:
1. `FloatVectorANNS` FloatVector search execution node
2. `BinaryVectorANNS` BinaryVector search execution node
2. `${core}/src/query/Expr.h` contains physical plan for scalar expression:
2. `${internal/core}/src/query/Expr.h` contains physical plan for scalar expression:
1. `TermExpr` support operation like `col in [1, 2, 3]`
2. `RangeExpr` support constant compare with data column like `a >= 5` `1 < b < 2`
3. `CompareExpr` support compare with different columns, like `a < b`
4. `LogicalBinaryExpr` support and/or
5. `LogicalUnaryExpr` support not

Currently, under `${core/query/visitors}` directory, there are the following visitors:
1. `ShowPlanNodeVisitor` prints PlanNode in json
2. `ShowExprVisitor` Expr -> json
3. `Verify...Visitor` validates ...
4. `ExtractInfo...Visitor` extracts info from..., including involved_fields and else
5. `ExecExprVisitor` generates bitmask according to expression
6. `ExecPlanNodeVistor` physical plan executor only supports ANNS node for now
Currently, under `${internal/core/src/query}` directory, there are the following visitors:
1. `ExecPlanNodeVistor` physical plan executor only supports ANNS node for now
6 changes: 6 additions & 0 deletions internal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/segcore/
FILES_MATCHING PATTERN "*_c.h"
)

# Install exec/expression/function
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/exec/expression/function/
DESTINATION include/exec/expression/function
FILES_MATCHING PATTERN "*_c.h"
)

# Install indexbuilder
install(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/src/indexbuilder/
DESTINATION include/indexbuilder
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ using float16 = knowhere::fp16;
using bfloat16 = knowhere::bf16;
using bin1 = knowhere::bin1;

// See also: schema.proto
enum class DataType {
NONE = 0,
BOOL = 1,
Expand Down
89 changes: 81 additions & 8 deletions internal/core/src/common/Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
#pragma once

#include <memory>
#include <string>

#include <boost/variant.hpp>
#include "common/FieldData.h"
#include "common/FieldDataInterface.h"
#include "common/Types.h"

namespace milvus {

/**
* @brief base class for different type vector
* @todo implement full null value support
*/

class BaseVector {
public:
BaseVector(DataType data_type,
Expand Down Expand Up @@ -87,7 +88,7 @@ class ColumnVector final : public BaseVector {
}

void*
GetRawData() {
GetRawData() const {
return values_->Data();
}

Expand All @@ -103,6 +104,80 @@ class ColumnVector final : public BaseVector {

using ColumnVectorPtr = std::shared_ptr<ColumnVector>;

template <typename T>
class ValueVector : public BaseVector {
public:
ValueVector(DataType data_type,
size_t length,
std::optional<size_t> null_count = std::nullopt)
: BaseVector(data_type, length) {
}

virtual const T
GetValueAt(size_t index) const = 0;

virtual TargetBitmap
Apply(std::function<bool(const T&)> func) = 0;
};

template <typename T>
class ConstantVector final : public ValueVector<T> {
public:
ConstantVector(DataType data_type,
size_t length,
const T& val,
std::optional<size_t> null_count = std::nullopt)
: ValueVector<T>(data_type, length, null_count), val_(val) {
}

const T
GetValueAt(size_t _) const override {
return val_;
}

TargetBitmap
Apply(std::function<bool(const T&)> func) override {
return TargetBitmap(this->size(), func(val_));
}

private:
const T val_;
};

// TODO: simd support
template <typename T>
class ColumnValueVector final : public ValueVector<T> {
public:
ColumnValueVector(DataType data_type,
size_t length,
std::optional<size_t> null_count = std::nullopt)
: ValueVector<T>(data_type, length, null_count) {
vec_.reserve(length);
}

void
Set(size_t index, const T& val) {
vec_[index] = val;
};

const T
GetValueAt(size_t i) const override {
return vec_[i];
}

TargetBitmap
Apply(std::function<bool(const T&)> func) override {
TargetBitmap result_vec(this->size());
for (int i = 0; i < this->size(); ++i) {
result_vec.set(i, func(GetValueAt(i)));
}
return result_vec;
}

private:
FixedVector<T> vec_;
};

/**
* @brief Multi vectors for scalar types
* mainly using it to pass internal result in segcore scalar engine system
Expand Down Expand Up @@ -130,8 +205,7 @@ class RowVector : public BaseVector {
}

RowVector(std::vector<VectorPtr>&& children)
: BaseVector(DataType::ROW, 0) {
children_values_ = std::move(children);
: BaseVector(DataType::ROW, 0), children_values_(std::move(children)) {
for (auto& child : children_values_) {
if (child->size() > length_) {
length_ = child->size();
Expand All @@ -140,12 +214,12 @@ class RowVector : public BaseVector {
}

const std::vector<VectorPtr>&
childrens() {
childrens() const {
return children_values_;
}

VectorPtr
child(int index) {
child(int index) const {
assert(index < children_values_.size());
return children_values_[index];
}
Expand All @@ -155,5 +229,4 @@ class RowVector : public BaseVector {
};

using RowVectorPtr = std::shared_ptr<RowVector>;

} // namespace milvus
2 changes: 1 addition & 1 deletion internal/core/src/common/init_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,4 @@ SetTrace(CTraceConfig* config) {
config->oltpSecure,
config->nodeID};
milvus::tracer::initTelemetry(traceConfig);
}
}
2 changes: 1 addition & 1 deletion internal/core/src/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,4 @@ Task::Next(ContinueFuture* future) {
}

} // namespace exec
} // namespace milvus
} // namespace milvus
46 changes: 46 additions & 0 deletions internal/core/src/exec/expression/CallExpr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/FieldDataInterface.h"
#include "common/Vector.h"
#include "exec/expression/CallExpr.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/function/FunctionFactory.h"

#include <utility>
#include <vector>

namespace milvus {
namespace exec {

void
PhyCallExpr::Eval(EvalCtx& context, VectorPtr& result) {
AssertInfo(inputs_.size() == expr_->inputs().size(),
"logical call expr needs {} inputs, but {} inputs are provided",
expr_->inputs().size(),
inputs_.size());
std::vector<VectorPtr> args;
for (auto &input: this->inputs_) {
VectorPtr arg_result;
input->Eval(context, arg_result);
args.push_back(std::move(arg_result));
}
RowVector row_vector(std::move(args));
this->expr_->function_ptr()(context, row_vector, result);
}

} // namespace exec
} // namespace milvus
83 changes: 83 additions & 0 deletions internal/core/src/exec/expression/CallExpr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "common/EasyAssert.h"
#include "common/FieldDataInterface.h"
#include "common/Utils.h"
#include "common/Vector.h"
#include "exec/expression/EvalCtx.h"
#include "exec/expression/Expr.h"
#include "exec/expression/function/FunctionFactory.h"
#include "expr/ITypeExpr.h"
#include "fmt/core.h"
#include "segcore/SegmentInterface.h"

namespace milvus {
namespace exec {

class PhyCallExpr : public Expr {
public:
PhyCallExpr(const std::vector<std::shared_ptr<Expr>>& input,
const std::shared_ptr<const milvus::expr::CallExpr>& expr,
const std::string& name,
const segcore::SegmentInternalInterface* segment,
int64_t active_count,
int64_t batch_size)
: Expr(DataType::BOOL, std::move(input), name),
expr_(expr),
active_count_(active_count),
segment_(segment),
batch_size_(batch_size) {
size_per_chunk_ = segment_->size_per_chunk();
num_chunk_ = upper_div(active_count_, size_per_chunk_);
AssertInfo(
batch_size_ > 0,
fmt::format("expr batch size should greater than zero, but now: {}",
batch_size_));
}

void
Eval(EvalCtx& context, VectorPtr& result) override;

void
MoveCursor() override {
for (auto input : inputs_) {
input->MoveCursor();
}
}

private:
std::shared_ptr<const milvus::expr::CallExpr> expr_;

int64_t active_count_{0};
int64_t num_chunk_{0};
int64_t current_chunk_id_{0};
int64_t current_chunk_pos_{0};
int64_t size_per_chunk_{0};

const segcore::SegmentInternalInterface* segment_;
int64_t batch_size_;
};

} // namespace exec
} // namespace milvus
Loading

0 comments on commit 12824a8

Please sign in to comment.