Skip to content

Commit

Permalink
enhance: Retry on inconsistent requery result (#31713)
Browse files Browse the repository at this point in the history
Make retry on ErrInConsistentRequery in proxy rather than in every SDK.

issue: #31642

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Mar 30, 2024
1 parent 88d426f commit 417c678
Showing 1 changed file with 38 additions and 1 deletion.
39 changes: 38 additions & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -2626,8 +2627,26 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
return it.result, nil
}

// Search search the most similar records of requests.
// Search searches the most similar records of requests.
func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
var err error
rsp := &milvuspb.SearchResults{
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.search(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
return false, nil
})
if err2 != nil {
rsp.Status = merr.Status(err2)
}
return rsp, err
}

func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
Expand Down Expand Up @@ -2804,6 +2823,24 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
}

func (node *Proxy) HybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
var err error
rsp := &milvuspb.SearchResults{
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.hybridSearch(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
return false, nil
})
if err2 != nil {
rsp.Status = merr.Status(err2)
}
return rsp, err
}

func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSearchRequest) (*milvuspb.SearchResults, error) {
receiveSize := proto.Size(request)
metrics.ProxyReceiveBytes.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
Expand Down

0 comments on commit 417c678

Please sign in to comment.