Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/2.5' into cp25/fix-l0-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
XuanYang-cn committed Nov 1, 2024
2 parents 3729f8d + 116bf50 commit 1c00852
Show file tree
Hide file tree
Showing 15 changed files with 607 additions and 62 deletions.
12 changes: 12 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1066,3 +1066,15 @@ streaming:
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default

# Any configuration related to the knowhere vector search engine
knowhere:
enable: true # When enable this configuration, the index parameters defined following will be automatically populated as index parameters, without requiring user input.
DISKANN: # Index parameters for diskann
build: # Diskann build params
max_degree: 56 # Maximum degree of the Vamana graph
search_list_size: 100 # Size of the candidate list during building graph
pq_code_budget_gb_ratio: 0.125 # Size limit on the PQ code (compared with raw data)
search_cache_budget_gb_ratio: 0.1 # Ratio of cached node numbers to raw data
search: # Diskann search params
beam_width_ratio: 4.0 # Ratio between the maximum number of IO requests per search iteration and CPU number.
6 changes: 6 additions & 0 deletions internal/core/src/common/Json.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class Json {

value_result<document>
doc() const {
if (data_.size() == 0) {
return {};
}
thread_local simdjson::ondemand::parser parser;

// it's always safe to add the padding,
Expand All @@ -148,6 +151,9 @@ class Json {

value_result<simdjson::dom::element>
dom_doc() const {
if (data_.size() == 0) {
return {};
}
thread_local simdjson::dom::parser parser;

// it's always safe to add the padding,
Expand Down
13 changes: 12 additions & 1 deletion internal/core/src/storage/LocalChunkManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "LocalChunkManager.h"
#include "log/Log.h"

#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>
Expand Down Expand Up @@ -232,7 +233,17 @@ LocalChunkManager::GetSizeOfDir(const std::string& dir) {
it != v.end();
++it) {
if (boost::filesystem::is_regular_file(it->path())) {
total_file_size += boost::filesystem::file_size(it->path());
boost::system::error_code ec;
auto file_size = boost::filesystem::file_size(it->path(), ec);
if (ec) {
// The file may be removed concurrently by other threads.
// So the file size cannot be obtained, just ignore it.
LOG_INFO("size of file {} cannot be obtained with error: {}",
it->path().string(),
ec.message());
continue;
}
total_file_size += file_size;
}
if (boost::filesystem::is_directory(it->path())) {
total_file_size += GetSizeOfDir(it->path().string());
Expand Down
15 changes: 15 additions & 0 deletions internal/datacoord/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/vecindexmgr"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparams"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -159,6 +161,19 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule

fieldID := dependency.meta.indexMeta.GetFieldIDByIndexID(segIndex.CollectionID, segIndex.IndexID)
binlogIDs := getBinLogIDs(segment, fieldID)

// When new index parameters are added, these parameters need to be updated to ensure they are included during the index-building process.
if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() {
var ret error
indexParams, ret = Params.KnowhereConfig.UpdateIndexParams(GetIndexType(indexParams), paramtable.BuildStage, indexParams)

if ret != nil {
log.Ctx(ctx).Warn("failed to update index build params defined in yaml", zap.Int64("taskID", it.taskID), zap.Error(ret))
it.SetState(indexpb.JobState_JobStateInit, ret.Error())
return false
}
}

if isDiskANNIndex(GetIndexType(indexParams)) {
var err error
indexParams, err = indexparams.UpdateDiskIndexBuildParams(Params, indexParams)
Expand Down
8 changes: 7 additions & 1 deletion internal/indexnode/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion()))

indexType := it.newIndexParams[common.IndexTypeKey]
var fieldDataSize uint64
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) {
// check index node support disk index
if !Params.IndexNodeCfg.EnableDisk.GetAsBool() {
Expand All @@ -225,7 +226,7 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
log.Warn("IndexNode get local used size failed")
return err
}
fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
fieldDataSize, err = estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType())
if err != nil {
log.Warn("IndexNode get local used size failed")
return err
Expand All @@ -247,6 +248,11 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
}
}

// system resource-related parameters, such as memory limits, CPU limits, and disk limits, are appended here to the parameter list
if vecindexmgr.GetVecIndexMgrInstance().IsVecIndex(indexType) && Params.KnowhereConfig.Enable.GetAsBool() {
it.newIndexParams, _ = Params.KnowhereConfig.MergeResourceParams(fieldDataSize, paramtable.BuildStage, it.newIndexParams)
}

storageConfig := &indexcgopb.StorageConfig{
Address: it.req.GetStorageConfig().GetAddress(),
AccessKeyID: it.req.GetStorageConfig().GetAccessKeyID(),
Expand Down
11 changes: 5 additions & 6 deletions internal/parser/planparserv2/parser_visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,7 @@ func (v *ParserVisitor) VisitTerm(ctx *parser.TermContext) interface{} {
} else {
elementValue := valueExpr.GetValue()
if elementValue == nil {
return fmt.Errorf(
"contains_any operation are only supported explicitly specified element, got: %s", ctx.Expr(1).GetText())
return fmt.Errorf("value '%s' in list cannot be a non-const expression", ctx.Expr(1).GetText())
}

if !IsArray(elementValue) {
Expand Down Expand Up @@ -662,12 +661,12 @@ func (v *ParserVisitor) VisitRange(ctx *parser.RangeContext) interface{} {
lowerValue := lowerValueExpr.GetValue()
upperValue := upperValueExpr.GetValue()
if !isTemplateExpr(lowerValueExpr) {
if err = checkRangeCompared(fieldDataType, lowerValue); err != nil {
if lowerValue, err = castRangeValue(fieldDataType, lowerValue); err != nil {
return err
}
}
if !isTemplateExpr(upperValueExpr) {
if err = checkRangeCompared(fieldDataType, upperValue); err != nil {
if upperValue, err = castRangeValue(fieldDataType, upperValue); err != nil {
return err
}
}
Expand Down Expand Up @@ -744,12 +743,12 @@ func (v *ParserVisitor) VisitReverseRange(ctx *parser.ReverseRangeContext) inter
lowerValue := lowerValueExpr.GetValue()
upperValue := upperValueExpr.GetValue()
if !isTemplateExpr(lowerValueExpr) {
if err = checkRangeCompared(fieldDataType, lowerValue); err != nil {
if lowerValue, err = castRangeValue(fieldDataType, lowerValue); err != nil {
return err
}
}
if !isTemplateExpr(upperValueExpr) {
if err = checkRangeCompared(fieldDataType, upperValue); err != nil {
if upperValue, err = castRangeValue(fieldDataType, upperValue); err != nil {
return err
}
}
Expand Down
23 changes: 22 additions & 1 deletion internal/parser/planparserv2/plan_parser_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,28 @@ func TestExpr_BinaryRange(t *testing.T) {
}
}

func TestExpr_castValue(t *testing.T) {
schema := newTestSchema()
helper, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)

exprStr := `Int64Field + 1.1 == 2.1`
expr, err := ParseExpr(helper, exprStr, nil)
assert.NoError(t, err, exprStr)
assert.NotNil(t, expr, exprStr)
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr())
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr().GetRightOperand().GetFloatVal())
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr().GetValue().GetFloatVal())

exprStr = `FloatField +1 == 2`
expr, err = ParseExpr(helper, exprStr, nil)
assert.NoError(t, err, exprStr)
assert.NotNil(t, expr, exprStr)
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr())
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr().GetRightOperand().GetFloatVal())
assert.NotNil(t, expr.GetBinaryArithOpEvalRangeExpr().GetValue().GetFloatVal())
}

func TestExpr_BinaryArith(t *testing.T) {
schema := newTestSchema()
helper, err := typeutil.CreateSchemaHelper(schema)
Expand All @@ -283,7 +305,6 @@ func TestExpr_BinaryArith(t *testing.T) {
`Int64Field % 10 == 9`,
`Int64Field % 10 != 9`,
`FloatField + 1.1 == 2.1`,
`Int64Field + 1.1 == 2.1`,
`A % 10 != 2`,
`Int8Field + 1 < 2`,
`Int16Field - 3 <= 4`,
Expand Down
34 changes: 23 additions & 11 deletions internal/parser/planparserv2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,30 @@ func castValue(dataType schemapb.DataType, value *planpb.GenericValue) (*planpb.
return nil, fmt.Errorf("cannot cast value to %s, value: %s", dataType.String(), value)
}

func combineBinaryArithExpr(op planpb.OpType, arithOp planpb.ArithOpType, columnInfo *planpb.ColumnInfo, operandExpr, valueExpr *planpb.ValueExpr) *planpb.Expr {
func combineBinaryArithExpr(op planpb.OpType, arithOp planpb.ArithOpType, arithExprDataType schemapb.DataType, columnInfo *planpb.ColumnInfo, operandExpr, valueExpr *planpb.ValueExpr) (*planpb.Expr, error) {
var err error
operand := operandExpr.GetValue()
if !isTemplateExpr(operandExpr) {
operand, err = castValue(arithExprDataType, operand)
if err != nil {
return nil, err
}
}

return &planpb.Expr{
Expr: &planpb.Expr_BinaryArithOpEvalRangeExpr{
BinaryArithOpEvalRangeExpr: &planpb.BinaryArithOpEvalRangeExpr{
ColumnInfo: columnInfo,
ArithOp: arithOp,
RightOperand: operandExpr.GetValue(),
RightOperand: operand,
Op: op,
Value: valueExpr.GetValue(),
OperandTemplateVariableName: operandExpr.GetTemplateVariableName(),
ValueTemplateVariableName: valueExpr.GetTemplateVariableName(),
},
},
IsTemplate: isTemplateExpr(operandExpr) || isTemplateExpr(valueExpr),
}
}, nil
}

func combineArrayLengthExpr(op planpb.OpType, arithOp planpb.ArithOpType, columnInfo *planpb.ColumnInfo, valueExpr *planpb.ValueExpr) (*planpb.Expr, error) {
Expand Down Expand Up @@ -297,7 +306,7 @@ func handleBinaryArithExpr(op planpb.OpType, arithExpr *planpb.BinaryArithExpr,
// a * 2 == 3
// a / 2 == 3
// a % 2 == 3
return combineBinaryArithExpr(op, arithOp, leftExpr.GetInfo(), rightValue, valueExpr), nil
return combineBinaryArithExpr(op, arithOp, arithExprDataType, leftExpr.GetInfo(), rightValue, valueExpr)
} else if rightExpr != nil && leftValue != nil {
// 2 + a == 3
// 2 - a == 3
Expand All @@ -307,7 +316,7 @@ func handleBinaryArithExpr(op planpb.OpType, arithExpr *planpb.BinaryArithExpr,

switch arithExpr.GetOp() {
case planpb.ArithOpType_Add, planpb.ArithOpType_Mul:
return combineBinaryArithExpr(op, arithOp, rightExpr.GetInfo(), leftValue, valueExpr), nil
return combineBinaryArithExpr(op, arithOp, arithExprDataType, rightExpr.GetInfo(), leftValue, valueExpr)
default:
return nil, fmt.Errorf("module field is not yet supported")
}
Expand Down Expand Up @@ -625,24 +634,27 @@ func checkValidModArith(tokenType planpb.ArithOpType, leftType, leftElementType,
return nil
}

func checkRangeCompared(dataType schemapb.DataType, value *planpb.GenericValue) error {
func castRangeValue(dataType schemapb.DataType, value *planpb.GenericValue) (*planpb.GenericValue, error) {
switch dataType {
case schemapb.DataType_String, schemapb.DataType_VarChar:
if !IsString(value) {
return fmt.Errorf("invalid range operations")
return nil, fmt.Errorf("invalid range operations")
}
case schemapb.DataType_Bool:
return fmt.Errorf("invalid range operations on boolean expr")
return nil, fmt.Errorf("invalid range operations on boolean expr")
case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32, schemapb.DataType_Int64:
if !IsInteger(value) {
return fmt.Errorf("invalid range operations")
return nil, fmt.Errorf("invalid range operations")
}
case schemapb.DataType_Float, schemapb.DataType_Double:
if !IsNumber(value) {
return fmt.Errorf("invalid range operations")
return nil, fmt.Errorf("invalid range operations")
}
if IsInteger(value) {
return NewFloat(float64(value.GetInt64Val())), nil
}
}
return nil
return value, nil
}

func checkContainsElement(columnExpr *ExprWithType, op planpb.JSONContainsExpr_JSONOp, elementValue *planpb.GenericValue) error {
Expand Down
8 changes: 8 additions & 0 deletions internal/proxy/task_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ func (cit *createIndexTask) parseIndexParams() error {
if !exist {
return fmt.Errorf("IndexType not specified")
}
// index parameters defined in the YAML file are merged with the user-provided parameters during create stage
if Params.KnowhereConfig.Enable.GetAsBool() {
var err error
indexParamsMap, err = Params.KnowhereConfig.MergeIndexParams(indexType, paramtable.BuildStage, indexParamsMap)
if err != nil {
return err
}
}
if vecindexmgr.GetVecIndexMgrInstance().IsDiskANN(indexType) {
err := indexparams.FillDiskIndexParams(Params, indexParamsMap)
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions internal/proxy/task_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -1053,6 +1054,43 @@ func Test_parseIndexParams(t *testing.T) {
err := cit.parseIndexParams()
assert.Error(t, err)
})

t.Run("verify merge params with yaml", func(t *testing.T) {
paramtable.Init()
Params.Save("knowhere.HNSW.build.M", "3000")
Params.Save("knowhere.HNSW.build.efConstruction", "120")
defer Params.Reset("knowhere.HNSW.build.M")
defer Params.Reset("knowhere.HNSW.build.efConstruction")

cit := &createIndexTask{
Condition: nil,
req: &milvuspb.CreateIndexRequest{
ExtraParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "HNSW",
},
{
Key: common.MetricTypeKey,
Value: metric.L2,
},
},
IndexName: "",
},
fieldSchema: &schemapb.FieldSchema{
FieldID: 101,
Name: "FieldVector",
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "768"},
},
},
}
err := cit.parseIndexParams()
// Out of range in json: param 'M' (3000) should be in range [2, 2048]
assert.Error(t, err)
})
}

func Test_wrapUserIndexParams(t *testing.T) {
Expand Down
Loading

0 comments on commit 1c00852

Please sign in to comment.