From 1d61b604e1a2b8363d70029362fbbecbef6ff086 Mon Sep 17 00:00:00 2001 From: Gao Date: Wed, 23 Oct 2024 19:19:30 +0800 Subject: [PATCH] enhance: support retry search when topk is reduced and result not enough (#35645) issue: #35576 This pr is to cover those cases when queryHook optimize search params and make the result size insufficient, add retry search mechanism and add related metrics for alarming. --------- Signed-off-by: chasingegg --- internal/proto/internal.proto | 2 + internal/proxy/impl.go | 96 ++++++++++++++----- internal/proxy/task_search.go | 18 ++++ internal/proxy/task_search_test.go | 6 +- internal/querynodev2/delegator/delegator.go | 1 + internal/querynodev2/optimizers/query_hook.go | 8 +- .../querynodev2/optimizers/query_hook_test.go | 42 +++++--- internal/querynodev2/segments/result.go | 10 ++ internal/querynodev2/services.go | 3 + internal/querynodev2/services_test.go | 26 +++-- pkg/metrics/proxy_metrics.go | 41 ++++++++ pkg/util/paramtable/autoindex_param.go | 13 ++- 12 files changed, 218 insertions(+), 48 deletions(-) diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index 154191d2db51c..d882ff007a478 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -126,6 +126,7 @@ message SearchRequest { int64 group_by_field_id = 23; int64 group_size = 24; int64 field_id = 25; + bool is_topk_reduce = 26; } message SubSearchResults { @@ -161,6 +162,7 @@ message SearchResults { repeated SubSearchResults sub_results = 15; bool is_advanced = 16; int64 all_search_count = 17; + bool is_topk_reduce = 18; } message CostAggregation { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index ec7b059854f87..ed7ab17641a81 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2899,9 +2899,30 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) rsp := &milvuspb.SearchResults{ Status: merr.Success(), } + + optimizedSearch := true + resultSizeInsufficient := false + isTopkReduce := false err2 := retry.Handle(ctx, func() (bool, error) { - rsp, err = node. - search(ctx, request) + rsp, resultSizeInsufficient, isTopkReduce, err = node.search(ctx, request, optimizedSearch) + if merr.Ok(rsp.GetStatus()) && optimizedSearch && resultSizeInsufficient && isTopkReduce && paramtable.Get().AutoIndexConfig.EnableResultLimitCheck.GetAsBool() { + // without optimize search + optimizedSearch = false + rsp, resultSizeInsufficient, isTopkReduce, err = node.search(ctx, request, optimizedSearch) + metrics.ProxyRetrySearchCount.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.SearchLabel, + request.GetCollectionName(), + ).Inc() + // result size still insufficient + if resultSizeInsufficient { + metrics.ProxyRetrySearchResultInsufficientCount.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.SearchLabel, + request.GetCollectionName(), + ).Inc() + } + } if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) { return true, merr.Error(rsp.GetStatus()) } @@ -2913,11 +2934,13 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) return rsp, err } -func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) { - metrics.GetStats(ctx). - SetNodeID(paramtable.GetNodeID()). - SetInboundLabel(metrics.SearchLabel). - SetCollectionName(request.GetCollectionName()) +func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest, optimizedSearch bool) (*milvuspb.SearchResults, bool, bool, error) { + receiveSize := proto.Size(request) + metrics.ProxyReceiveBytes.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.SearchLabel, + request.GetCollectionName(), + ).Add(float64(receiveSize)) metrics.ProxyReceivedNQ.WithLabelValues( strconv.FormatInt(paramtable.GetNodeID(), 10), @@ -2928,7 +2951,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } method := "Search" @@ -2949,7 +2972,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) if err != nil { return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } request.PlaceholderGroup = placeholderGroupBytes @@ -2963,7 +2986,8 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) commonpbutil.WithMsgType(commonpb.MsgType_Search), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), - ReqID: paramtable.GetNodeID(), + ReqID: paramtable.GetNodeID(), + IsTopkReduce: optimizedSearch, }, request: request, tr: timerecord.NewTimeRecorder("search"), @@ -3017,7 +3041,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } tr.CtxRecord(ctx, "search request enqueue") @@ -3043,7 +3067,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } span := tr.CtxRecord(ctx, "wait search result") @@ -3100,7 +3124,7 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v)) } } - return qt.result, nil + return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil } func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) { @@ -3108,8 +3132,29 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea rsp := &milvuspb.SearchResults{ Status: merr.Success(), } + optimizedSearch := true + resultSizeInsufficient := false + isTopkReduce := false err2 := retry.Handle(ctx, func() (bool, error) { - rsp, err = node.hybridSearch(ctx, request) + rsp, resultSizeInsufficient, isTopkReduce, err = node.hybridSearch(ctx, request, optimizedSearch) + if merr.Ok(rsp.GetStatus()) && optimizedSearch && resultSizeInsufficient && isTopkReduce && paramtable.Get().AutoIndexConfig.EnableResultLimitCheck.GetAsBool() { + // without optimize search + optimizedSearch = false + rsp, resultSizeInsufficient, isTopkReduce, err = node.hybridSearch(ctx, request, optimizedSearch) + metrics.ProxyRetrySearchCount.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.HybridSearchLabel, + request.GetCollectionName(), + ).Inc() + // result size still insufficient + if resultSizeInsufficient { + metrics.ProxyRetrySearchResultInsufficientCount.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.HybridSearchLabel, + request.GetCollectionName(), + ).Inc() + } + } if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) { return true, merr.Error(rsp.GetStatus()) } @@ -3121,16 +3166,18 @@ func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSea return rsp, err } -func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) { - metrics.GetStats(ctx). - SetNodeID(paramtable.GetNodeID()). - SetInboundLabel(metrics.HybridSearchLabel). - SetCollectionName(request.GetCollectionName()) +func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest, optimizedSearch bool) (*milvuspb.SearchResults, bool, bool, error) { + receiveSize := proto.Size(request) + metrics.ProxyReceiveBytes.WithLabelValues( + strconv.FormatInt(paramtable.GetNodeID(), 10), + metrics.HybridSearchLabel, + request.GetCollectionName(), + ).Add(float64(receiveSize)) if err := merr.CheckHealthy(node.GetStateCode()); err != nil { return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } method := "HybridSearch" @@ -3154,7 +3201,8 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea commonpbutil.WithMsgType(commonpb.MsgType_Search), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), - ReqID: paramtable.GetNodeID(), + ReqID: paramtable.GetNodeID(), + IsTopkReduce: optimizedSearch, }, request: newSearchReq, tr: timerecord.NewTimeRecorder(method), @@ -3203,7 +3251,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } tr.CtxRecord(ctx, "hybrid search request enqueue") @@ -3228,7 +3276,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea return &milvuspb.SearchResults{ Status: merr.Status(err), - }, nil + }, false, false, nil } span := tr.CtxRecord(ctx, "wait hybrid search result") @@ -3285,7 +3333,7 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v)) } } - return qt.result, nil + return qt.result, qt.resultSizeInsufficient, qt.isTopkReduce, nil } func (node *Proxy) getVectorPlaceholderGroupForSearchByPks(ctx context.Context, request *milvuspb.SearchRequest) ([]byte, error) { diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 4e14c7b938717..86810c2abc7c6 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -62,6 +62,8 @@ type searchTask struct { partitionKeyMode bool enableMaterializedView bool mustUsePartitionKey bool + resultSizeInsufficient bool + isTopkReduce bool userOutputFields []string userDynamicFields []string @@ -644,7 +646,11 @@ func (t *searchTask) PostExecute(ctx context.Context) error { t.queryChannelsTs = make(map[string]uint64) t.relatedDataSize = 0 + isTopkReduce := false for _, r := range toReduceResults { + if r.GetIsTopkReduce() { + isTopkReduce = true + } t.relatedDataSize += r.GetCostAggregation().GetTotalRelatedDataSize() for ch, ts := range r.GetChannelsMvcc() { t.queryChannelsTs[ch] = ts @@ -657,6 +663,7 @@ func (t *searchTask) PostExecute(ctx context.Context) error { return err } + // reduce if t.SearchRequest.GetIsAdvanced() { multipleInternalResults := make([][]*internalpb.SearchResults, len(t.SearchRequest.GetSubReqs())) for _, searchResult := range toReduceResults { @@ -713,6 +720,17 @@ func (t *searchTask) PostExecute(ctx context.Context) error { } } + // reduce done, get final result + limit := t.SearchRequest.GetTopk() - t.SearchRequest.GetOffset() + resultSizeInsufficient := false + for _, topk := range t.result.Results.Topks { + if topk < limit { + resultSizeInsufficient = true + break + } + } + t.resultSizeInsufficient = resultSizeInsufficient + t.isTopkReduce = isTopkReduce t.result.CollectionName = t.collectionName t.fillInFieldInfo() diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 9ed4a03f27dec..c0cbbeefc659a 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -72,7 +72,9 @@ func TestSearchTask_PostExecute(t *testing.T) { task := &searchTask{ ctx: ctx, collectionName: collName, - SearchRequest: &internalpb.SearchRequest{}, + SearchRequest: &internalpb.SearchRequest{ + IsTopkReduce: true, + }, request: &milvuspb.SearchRequest{ CollectionName: collName, Nq: 1, @@ -98,6 +100,8 @@ func TestSearchTask_PostExecute(t *testing.T) { err := qt.PostExecute(context.TODO()) assert.NoError(t, err) assert.Equal(t, qt.result.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success) + assert.Equal(t, qt.resultSizeInsufficient, true) + assert.Equal(t, qt.isTopkReduce, false) }) } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 6cb63aa65a79c..5c56ebd0ff4a2 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -382,6 +382,7 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest GroupByFieldId: subReq.GetGroupByFieldId(), GroupSize: subReq.GetGroupSize(), FieldId: subReq.GetFieldId(), + IsTopkReduce: req.GetReq().GetIsTopkReduce(), } future := conc.Go(func() (*internalpb.SearchResults, error) { searchReq := &querypb.SearchRequest{ diff --git a/internal/querynodev2/optimizers/query_hook.go b/internal/querynodev2/optimizers/query_hook.go index 9f0f7b6e5886a..2a00d206866d3 100644 --- a/internal/querynodev2/optimizers/query_hook.go +++ b/internal/querynodev2/optimizers/query_hook.go @@ -27,6 +27,7 @@ type QueryHook interface { func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, queryHook QueryHook, numSegments int) (*querypb.SearchRequest, error) { // no hook applied or disabled, just return if queryHook == nil || !paramtable.Get().AutoIndexConfig.Enable.GetAsBool() { + req.Req.IsTopkReduce = false return req, nil } @@ -67,7 +68,7 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query common.SegmentNumKey: estSegmentNum, common.WithFilterKey: withFilter, common.DataTypeKey: int32(plan.GetVectorAnns().GetVectorType()), - common.WithOptimizeKey: paramtable.Get().AutoIndexConfig.EnableOptimize.GetAsBool(), + common.WithOptimizeKey: paramtable.Get().AutoIndexConfig.EnableOptimize.GetAsBool() && req.GetReq().GetIsTopkReduce(), common.CollectionKey: req.GetReq().GetCollectionID(), } if withFilter && channelNum > 1 { @@ -78,7 +79,9 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query log.Warn("failed to execute queryHook", zap.Error(err)) return nil, merr.WrapErrServiceUnavailable(err.Error(), "queryHook execution failed") } - queryInfo.Topk = params[common.TopKKey].(int64) + finalTopk := params[common.TopKKey].(int64) + isTopkReduce := req.GetReq().GetIsTopkReduce() && (finalTopk < queryInfo.GetTopk()) + queryInfo.Topk = finalTopk queryInfo.SearchParams = params[common.SearchParamKey].(string) serializedExprPlan, err := proto.Marshal(&plan) if err != nil { @@ -86,6 +89,7 @@ func OptimizeSearchParams(ctx context.Context, req *querypb.SearchRequest, query return nil, merr.WrapErrParameterInvalid("marshalable search plan", "plan with marshal error", err.Error()) } req.Req.SerializedExprPlan = serializedExprPlan + req.Req.IsTopkReduce = isTopkReduce log.Debug("optimized search params done", zap.Any("queryInfo", queryInfo)) default: log.Warn("not supported node type", zap.String("nodeType", fmt.Sprintf("%T", plan.GetNode()))) diff --git a/internal/querynodev2/optimizers/query_hook_test.go b/internal/querynodev2/optimizers/query_hook_test.go index f34acd8ccd15f..ac5f8ed505d6e 100644 --- a/internal/querynodev2/optimizers/query_hook_test.go +++ b/internal/querynodev2/optimizers/query_hook_test.go @@ -47,27 +47,43 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() { suite.queryHook = nil }() - plan := &planpb.PlanNode{ - Node: &planpb.PlanNode_VectorAnns{ - VectorAnns: &planpb.VectorANNS{ - QueryInfo: &planpb.QueryInfo{ - Topk: 100, - SearchParams: `{"param": 1}`, + getPlan := func(topk int64) *planpb.PlanNode { + return &planpb.PlanNode{ + Node: &planpb.PlanNode_VectorAnns{ + VectorAnns: &planpb.VectorANNS{ + QueryInfo: &planpb.QueryInfo{ + Topk: topk, + SearchParams: `{"param": 1}`, + }, }, }, - }, + } } - bs, err := proto.Marshal(plan) + + bs, err := proto.Marshal(getPlan(100)) suite.Require().NoError(err) req, err := OptimizeSearchParams(ctx, &querypb.SearchRequest{ Req: &internalpb.SearchRequest{ SerializedExprPlan: bs, + IsTopkReduce: true, + }, + TotalChannelNum: 2, + }, suite.queryHook, 2) + suite.NoError(err) + suite.verifyQueryInfo(req, 50, true, `{"param": 2}`) + + bs, err = proto.Marshal(getPlan(50)) + suite.Require().NoError(err) + req, err = OptimizeSearchParams(ctx, &querypb.SearchRequest{ + Req: &internalpb.SearchRequest{ + SerializedExprPlan: bs, + IsTopkReduce: true, }, TotalChannelNum: 2, }, suite.queryHook, 2) suite.NoError(err) - suite.verifyQueryInfo(req, 50, `{"param": 2}`) + suite.verifyQueryInfo(req, 50, false, `{"param": 2}`) }) suite.Run("disable optimization", func() { @@ -95,7 +111,7 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() { TotalChannelNum: 2, }, suite.queryHook, 2) suite.NoError(err) - suite.verifyQueryInfo(req, 100, `{"param": 1}`) + suite.verifyQueryInfo(req, 100, false, `{"param": 1}`) }) suite.Run("no_hook", func() { @@ -118,11 +134,12 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() { req, err := OptimizeSearchParams(ctx, &querypb.SearchRequest{ Req: &internalpb.SearchRequest{ SerializedExprPlan: bs, + IsTopkReduce: true, }, TotalChannelNum: 2, }, suite.queryHook, 2) suite.NoError(err) - suite.verifyQueryInfo(req, 100, `{"param": 1}`) + suite.verifyQueryInfo(req, 100, false, `{"param": 1}`) }) suite.Run("other_plannode", func() { @@ -203,7 +220,7 @@ func (suite *QueryHookSuite) TestOptimizeSearchParam() { }) } -func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK int64, param string) { +func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK int64, isTopkReduce bool, param string) { planBytes := req.GetReq().GetSerializedExprPlan() plan := planpb.PlanNode{} @@ -213,6 +230,7 @@ func (suite *QueryHookSuite) verifyQueryInfo(req *querypb.SearchRequest, topK in queryInfo := plan.GetVectorAnns().GetQueryInfo() suite.Equal(topK, queryInfo.GetTopk()) suite.Equal(param, queryInfo.GetSearchParams()) + suite.Equal(isTopkReduce, req.GetReq().GetIsTopkReduce()) } func TestOptimizeSearchParam(t *testing.T) { diff --git a/internal/querynodev2/segments/result.go b/internal/querynodev2/segments/result.go index f6e3efc0eae64..c3eaa032cc9fb 100644 --- a/internal/querynodev2/segments/result.go +++ b/internal/querynodev2/segments/result.go @@ -63,10 +63,14 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult defer sp.End() channelsMvcc := make(map[string]uint64) + isTopkReduce := false for _, r := range results { for ch, ts := range r.GetChannelsMvcc() { channelsMvcc[ch] = ts } + if r.GetIsTopkReduce() { + isTopkReduce = true + } // shouldn't let new SearchResults.MetricType to be empty, though the req.MetricType is empty if info.GetMetricType() == "" { info.SetMetricType(r.MetricType) @@ -120,6 +124,7 @@ func ReduceSearchResults(ctx context.Context, results []*internalpb.SearchResult }, 0) searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize searchResults.ChannelsMvcc = channelsMvcc + searchResults.IsTopkReduce = isTopkReduce return searchResults, nil } @@ -129,11 +134,15 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear channelsMvcc := make(map[string]uint64) relatedDataSize := int64(0) + isTopkReduce := false searchResults := &internalpb.SearchResults{ IsAdvanced: true, } for index, result := range results { + if result.GetIsTopkReduce() { + isTopkReduce = true + } relatedDataSize += result.GetCostAggregation().GetTotalRelatedDataSize() for ch, ts := range result.GetChannelsMvcc() { channelsMvcc[ch] = ts @@ -169,6 +178,7 @@ func ReduceAdvancedSearchResults(ctx context.Context, results []*internalpb.Sear searchResults.CostAggregation = &internalpb.CostAggregation{} } searchResults.CostAggregation.TotalRelatedDataSize = relatedDataSize + searchResults.IsTopkReduce = isTopkReduce return searchResults, nil } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 7c16165ef6dbb..77b2d1c4ee134 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -721,6 +721,9 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe resp = task.SearchResult() resp.GetCostAggregation().ResponseTime = tr.ElapseSpan().Milliseconds() resp.GetCostAggregation().TotalNQ = node.scheduler.GetWaitingTaskTotalNQ() + if req.GetReq().GetIsTopkReduce() { + resp.IsTopkReduce = true + } return resp, nil } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 86d88b37320b4..686733b744be6 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1157,7 +1157,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() { } // Test Search -func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string) (*internalpb.SearchRequest, error) { +func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataType, fieldID int64, metricType string, isTopkReduce bool) (*internalpb.SearchRequest, error) { placeHolder, err := genPlaceHolderGroup(nq) if err != nil { return nil, err @@ -1181,6 +1181,7 @@ func (suite *ServiceSuite) genCSearchRequest(nq int64, dataType schemapb.DataTyp DslType: commonpb.DslType_BoolExprV1, Nq: nq, MvccTimestamp: typeutil.MaxTimestamp, + IsTopkReduce: isTopkReduce, }, nil } @@ -1190,7 +1191,7 @@ func (suite *ServiceSuite) TestSearch_Normal() { suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() - creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) + creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ Req: creq, @@ -1214,7 +1215,7 @@ func (suite *ServiceSuite) TestSearch_Concurrent() { futures := make([]*conc.Future[*internalpb.SearchResults], 0, concurrency) for i := 0; i < concurrency; i++ { future := conc.Go(func() (*internalpb.SearchResults, error) { - creq, err := suite.genCSearchRequest(30, schemapb.DataType_FloatVector, 107, defaultMetricType) + creq, err := suite.genCSearchRequest(30, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ Req: creq, @@ -1240,7 +1241,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { // data schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false) - creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, "invalidMetricType") + creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, "invalidMetricType", false) req := &querypb.SearchRequest{ Req: creq, @@ -1319,6 +1320,7 @@ func (suite *ServiceSuite) TestSearchSegments_Unhealthy() { rsp, err := suite.node.SearchSegments(ctx, req) suite.NoError(err) + suite.Equal(false, rsp.GetIsTopkReduce()) suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode()) suite.Equal(merr.Code(merr.ErrServiceNotReady), rsp.GetStatus().GetCode()) } @@ -1338,6 +1340,7 @@ func (suite *ServiceSuite) TestSearchSegments_Failed() { rsp, err := suite.node.SearchSegments(ctx, req) suite.NoError(err) + suite.Equal(false, rsp.GetIsTopkReduce()) suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode()) suite.Equal(merr.Code(merr.ErrCollectionNotLoaded), rsp.GetStatus().GetCode()) @@ -1358,7 +1361,7 @@ func (suite *ServiceSuite) TestSearchSegments_Normal() { suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() - creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) + creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ Req: creq, @@ -1369,6 +1372,14 @@ func (suite *ServiceSuite) TestSearchSegments_Normal() { rsp, err := suite.node.SearchSegments(ctx, req) suite.NoError(err) + suite.Equal(rsp.GetIsTopkReduce(), false) + suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode()) + + req.Req, err = suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, true) + suite.NoError(err) + rsp, err = suite.node.SearchSegments(ctx, req) + suite.NoError(err) + suite.Equal(rsp.GetIsTopkReduce(), true) suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode()) } @@ -1378,7 +1389,7 @@ func (suite *ServiceSuite) TestStreamingSearch() { suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() paramtable.Get().Save(paramtable.Get().QueryNodeCfg.UseStreamComputing.Key, "true") - creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) + creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ Req: creq, FromShardLeader: true, @@ -1391,6 +1402,7 @@ func (suite *ServiceSuite) TestStreamingSearch() { rsp, err := suite.node.SearchSegments(ctx, req) suite.NoError(err) + suite.Equal(false, rsp.GetIsTopkReduce()) suite.Equal(commonpb.ErrorCode_Success, rsp.GetStatus().GetErrorCode()) } @@ -1399,7 +1411,7 @@ func (suite *ServiceSuite) TestStreamingSearchGrowing() { // pre suite.TestWatchDmChannelsInt64() paramtable.Get().Save(paramtable.Get().QueryNodeCfg.UseStreamComputing.Key, "true") - creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) + creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType, false) req := &querypb.SearchRequest{ Req: creq, FromShardLeader: true, diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 4952005dccabd..40a6cc01d1b92 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -389,6 +389,25 @@ var ( Name: "max_insert_rate", Help: "max insert rate", }, []string{"node_id", "scope"}) + + // ProxyRetrySearchCount records the retry search count when result count does not meet limit and topk reduce is on + ProxyRetrySearchCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "retry_search_cnt", + Help: "counter of retry search", + }, []string{nodeIDLabelName, queryTypeLabelName, collectionName}) + + // ProxyRetrySearchResultInsufficientCount records the retry search without reducing topk that still not meet result limit + // there are more likely some non-index-related reasons like we do not have enough entities for very big k, duplicate pks, etc + ProxyRetrySearchResultInsufficientCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "retry_search_result_insufficient_cnt", + Help: "counter of retry search which does not have enough results", + }, []string{nodeIDLabelName, queryTypeLabelName, collectionName}) ) // RegisterProxy registers Proxy metrics @@ -447,6 +466,8 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxyReqInQueueLatency) registry.MustRegister(MaxInsertRate) + registry.MustRegister(ProxyRetrySearchCount) + registry.MustRegister(ProxyRetrySearchResultInsufficientCount) RegisterStreamingServiceClient(registry) } @@ -552,4 +573,24 @@ func CleanupProxyCollectionMetrics(nodeID int64, collection string) { nodeIDLabelName: strconv.FormatInt(nodeID, 10), msgTypeLabelName: UpsertLabel, collectionName: collection, }) + ProxyRetrySearchCount.Delete(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: SearchLabel, + collectionName: collection, + }) + ProxyRetrySearchCount.Delete(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: HybridSearchLabel, + collectionName: collection, + }) + ProxyRetrySearchResultInsufficientCount.Delete(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: SearchLabel, + collectionName: collection, + }) + ProxyRetrySearchResultInsufficientCount.Delete(prometheus.Labels{ + nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: HybridSearchLabel, + collectionName: collection, + }) } diff --git a/pkg/util/paramtable/autoindex_param.go b/pkg/util/paramtable/autoindex_param.go index 385b2f9baad40..3a99038703af5 100644 --- a/pkg/util/paramtable/autoindex_param.go +++ b/pkg/util/paramtable/autoindex_param.go @@ -30,8 +30,9 @@ import ( // ///////////////////////////////////////////////////////////////////////////// // --- common --- type autoIndexConfig struct { - Enable ParamItem `refreshable:"true"` - EnableOptimize ParamItem `refreshable:"true"` + Enable ParamItem `refreshable:"true"` + EnableOptimize ParamItem `refreshable:"true"` + EnableResultLimitCheck ParamItem `refreshable:"true"` IndexParams ParamItem `refreshable:"true"` SparseIndexParams ParamItem `refreshable:"true"` @@ -76,6 +77,14 @@ func (p *autoIndexConfig) init(base *BaseTable) { } p.EnableOptimize.Init(base.mgr) + p.EnableResultLimitCheck = ParamItem{ + Key: "autoIndex.resultLimitCheck", + Version: "2.5.0", + DefaultValue: "true", + PanicIfEmpty: true, + } + p.EnableResultLimitCheck.Init(base.mgr) + p.IndexParams = ParamItem{ Key: "autoIndex.params.build", Version: "2.2.0",