Skip to content

Commit

Permalink
enhance: refactor executor framework V2
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Aug 2, 2024
1 parent a7f7d91 commit f93ea84
Show file tree
Hide file tree
Showing 80 changed files with 1,595 additions and 7,256 deletions.
2 changes: 1 addition & 1 deletion internal/core/src/bitset/bitset.h
Original file line number Diff line number Diff line change
Expand Up @@ -797,13 +797,13 @@ class BitsetBase {
this->data(), other.data(), this->offset(), other.offset(), size);
}

private:
// Return the starting bit offset in our container.
inline size_type
offset() const {
return as_derived().offset_impl();
}

private:
// CRTP
inline ImplT&
as_derived() {
Expand Down
1 change: 1 addition & 0 deletions internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ using SegOffset =

//using BitsetType = boost::dynamic_bitset<>;
using BitsetType = CustomBitset;
using BitsetTypeView = CustomBitsetView;
using BitsetTypePtr = std::shared_ptr<BitsetType>;
using BitsetTypeOpt = std::optional<BitsetType>;

Expand Down
17 changes: 16 additions & 1 deletion internal/core/src/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ SparseBytesToRows(const Iterable& rows, const bool validate = false) {
// SparseRowsToProto converts a list of knowhere::sparse::SparseRow<float> to
// a milvus::proto::schema::SparseFloatArray. The resulting proto is a deep copy
// of the source data. source(i) returns the i-th row to be copied.
inline void SparseRowsToProto(
inline void
SparseRowsToProto(
const std::function<const knowhere::sparse::SparseRow<float>*(size_t)>&
source,
int64_t rows,
Expand All @@ -287,4 +288,18 @@ inline void SparseRowsToProto(
proto->set_dim(max_dim);
}

class Defer {
public:
Defer(std::function<void()> fn) : fn_(fn) {
}
~Defer() {
fn_();
}

private:
std::function<void()> fn_;
};

#define DeferLambda(fn) Defer Defer_##__COUNTER__(fn);

} // namespace milvus
10 changes: 10 additions & 0 deletions internal/core/src/common/Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ class RowVector : public BaseVector {
}
}

RowVector(std::vector<VectorPtr>&& children)
: BaseVector(DataType::ROW, 0) {
children_values_ = std::move(children);
for (auto& child : children_values_) {
if (child->size() > length_) {
length_ = child->size();
}
}
}

const std::vector<VectorPtr>&
childrens() {
return children_values_;
Expand Down
4 changes: 4 additions & 0 deletions internal/core/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ set(MILVUS_EXEC_SRCS
expression/ExistsExpr.cpp
operator/FilterBits.cpp
operator/Operator.cpp
operator/MvccNode.cpp
operator/VectorSearch.cpp
operator/CountNode.cpp
operator/groupby/SearchGroupByOperator.cpp
Driver.cpp
Task.cpp
)
Expand Down
19 changes: 19 additions & 0 deletions internal/core/src/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
#include <cassert>
#include <memory>

#include "common/EasyAssert.h"
#include "exec/operator/CallbackSink.h"
#include "exec/operator/CountNode.h"
#include "exec/operator/FilterBits.h"
#include "exec/operator/MvccNode.h"
#include "exec/operator/Operator.h"
#include "exec/operator/VectorSearch.h"
#include "exec/Task.h"

#include "common/EasyAssert.h"
Expand Down Expand Up @@ -52,6 +56,21 @@ DriverFactory::CreateDriver(std::unique_ptr<DriverContext> ctx,
plannode)) {
operators.push_back(
std::make_unique<FilterBits>(id, ctx.get(), filternode));
} else if (auto mvccnode =
std::dynamic_pointer_cast<const plan::MvccNode>(
plannode)) {
operators.push_back(
std::make_unique<PhyMvccNode>(id, ctx.get(), mvccnode));
} else if (auto countnode =
std::dynamic_pointer_cast<const plan::CountNode>(
plannode)) {
operators.push_back(
std::make_unique<PhyCountNode>(id, ctx.get(), countnode));
} else if (auto vectorsearchnode =
std::dynamic_pointer_cast<const plan::VectorSearchNode>(
plannode)) {
operators.push_back(std::make_unique<PhyVectorSearchNode>(
id, ctx.get(), vectorsearchnode));
}
// TODO: add more operators
}
Expand Down
67 changes: 67 additions & 0 deletions internal/core/src/exec/QueryContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,61 @@ class QueryContext : public Context {
return active_count_;
}

bool
get_pk_term_offset_cache_initialized() const {
return pk_term_offset_cache_initialized_;
}

void
set_pk_term_offset_cache_initialized(bool val) {
pk_term_offset_cache_initialized_ = val;
}

void
set_pk_term_offset_cache(std::vector<int64_t>&& val) {
pk_term_offset_cache_ = std::move(val);
}

milvus::SearchInfo
get_search_info() {
return search_info_;
}

const query::PlaceholderGroup*
get_placeholder_group() {
return placeholder_group_;
}

void
set_search_info(const milvus::SearchInfo& search_info) {
search_info_ = search_info;
}

void
set_placeholder_group(const query::PlaceholderGroup* placeholder_group) {
placeholder_group_ = placeholder_group;
}

void
set_search_result(milvus::SearchResult&& result) {
search_result_ = std::move(result);
}

milvus::SearchResult&&
get_search_result() {
return std::move(search_result_);
}

void
set_retrieve_result(milvus::RetrieveResult&& result) {
retrieve_result_ = std::move(result);
}

milvus::RetrieveResult&&
get_retrieve_result() {
return std::move(retrieve_result_);
}

private:
folly::Executor* executor_;
//folly::Executor::KeepAlive<> executor_keepalive_;
Expand All @@ -238,6 +293,18 @@ class QueryContext : public Context {
int64_t active_count_;
// timestamp this query generate
milvus::Timestamp query_timestamp_;

// used for pk term optimization
bool pk_term_offset_cache_initialized_;
std::vector<int64_t> pk_term_offset_cache_;

// used for vector search
milvus::SearchInfo search_info_;
const query::PlaceholderGroup* placeholder_group_;

// used for store segment search/retrieve result
milvus::SearchResult search_result_;
milvus::RetrieveResult retrieve_result_;
};

// Represent the state of one thread of query execution.
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/exec/operator/CallbackSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ class CallbackSink : public Operator {
return BlockingReason::kNotBlocked;
}

virtual std::string
ToString() const override {
return "CallbackSink";
}

private:
void
Close() override {
Expand Down
72 changes: 72 additions & 0 deletions internal/core/src/exec/operator/CountNode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "CountNode.h"

namespace milvus {
namespace exec {

static std::unique_ptr<milvus::RetrieveResult>
wrap_num_entities(int64_t cnt, int64_t size) {
auto retrieve_result = std::make_unique<milvus::RetrieveResult>();
DataArray arr;
arr.set_type(milvus::proto::schema::Int64);
auto scalar = arr.mutable_scalars();
scalar->mutable_long_data()->mutable_data()->Add(cnt);
retrieve_result->field_data_ = {arr};
retrieve_result->total_data_cnt_ = size;
return retrieve_result;
}

PhyCountNode::PhyCountNode(int32_t operator_id,
DriverContext* driverctx,
const std::shared_ptr<const plan::CountNode>& node)
: Operator(driverctx, node->output_type(), operator_id, node->id()) {
ExecContext* exec_context = operator_context_->get_exec_context();
query_context_ = exec_context->get_query_context();
segment_ = query_context_->get_segment();
query_timestamp_ = query_context_->get_query_timestamp();
active_count_ = query_context_->get_active_count();
}

void
PhyCountNode::AddInput(RowVectorPtr& input) {
input_ = std::move(input);
}

RowVectorPtr
PhyCountNode::GetOutput() {
if (is_finished_ || !no_more_input_) {
return nullptr;
}

auto col_input = GetColumnVector(input_);
TargetBitmapView view(col_input->GetRawData(), col_input->size());
auto cnt = view.size() - view.count();
query_context_->set_retrieve_result(
std::move(*(wrap_num_entities(cnt, view.size()))));
is_finished_ = true;

return input_;
}

bool
PhyCountNode::IsFinished() {
return is_finished_;
}

} // namespace exec
} // namespace milvus
78 changes: 78 additions & 0 deletions internal/core/src/exec/operator/CountNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>
#include <string>

#include "exec/Driver.h"
#include "exec/expression/Expr.h"
#include "exec/operator/Operator.h"
#include "exec/QueryContext.h"

namespace milvus {
namespace exec {

class PhyCountNode : public Operator {
public:
PhyCountNode(int32_t operator_id,
DriverContext* ctx,
const std::shared_ptr<const plan::CountNode>& node);

bool
IsFilter() override {
return false;
}

bool
NeedInput() const override {
return !is_finished_;
}

void
AddInput(RowVectorPtr& input);

RowVectorPtr
GetOutput() override;

bool
IsFinished() override;

void
Close() override {
}

BlockingReason
IsBlocked(ContinueFuture* /* unused */) override {
return BlockingReason::kNotBlocked;
}

virtual std::string
ToString() const override {
return "PhyCountNode";
}

private:
const segcore::SegmentInternalInterface* segment_;
milvus::Timestamp query_timestamp_;
int64_t active_count_;
QueryContext* query_context_;
bool is_finished_{false};
};

} // namespace exec
} // namespace milvus
Loading

0 comments on commit f93ea84

Please sign in to comment.