Skip to content

Commit

Permalink
fix: Support retry on milvus errors and retry on ErrInconsistentRequery
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Mar 18, 2024
1 parent 1a1a092 commit f798a4b
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 5 deletions.
16 changes: 11 additions & 5 deletions client/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/milvus-io/milvus-sdk-go/v2/merr"
)

const (
Expand Down Expand Up @@ -79,12 +78,16 @@ func (c *GrpcClient) HybridSearch(ctx context.Context, collName string, partitio
RankParams: params,
}

result, err := c.Service.HybridSearch(ctx, req)

err = merr.CheckRPCCall(result, err)
r, err := RetryOnMilvusErrors(ctx, func() (interface{}, error) {
return c.Service.HybridSearch(ctx, req)
}, OnMerrCodes(2200))
if err != nil {
return nil, err
}
result := r.(*milvuspb.SearchResults)
if err = handleRespStatus(result.GetStatus()); err != nil {
return nil, err
}

return c.handleSearchResult(schema, outputFields, nq, result)
}
Expand Down Expand Up @@ -117,10 +120,13 @@ func (c *GrpcClient) Search(ctx context.Context, collName string, partitions []s
return nil, err
}

resp, err := c.Service.Search(ctx, req)
r, err := RetryOnMilvusErrors(ctx, func() (interface{}, error) {
return c.Service.Search(ctx, req)
}, OnMerrCodes(2200))
if err != nil {
return nil, err
}
resp := r.(*milvuspb.SearchResults)
if err := handleRespStatus(resp.GetStatus()); err != nil {
return nil, err
}
Expand Down
141 changes: 141 additions & 0 deletions client/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"time"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)

type config struct {
attempts uint
sleep time.Duration
maxSleepTime time.Duration
merrCodes []int32
}

func newDefaultConfig() *config {
return &config{
attempts: uint(10),
sleep: 200 * time.Millisecond,
maxSleepTime: 3 * time.Second,
}
}

// Option is used to config the retry function.
type Option func(*config)

// Attempts is used to config the max retry times.
func Attempts(attempts uint) Option {
return func(c *config) {
c.attempts = attempts
}
}

// Sleep is used to config the initial interval time of each execution.
func Sleep(sleep time.Duration) Option {
return func(c *config) {
c.sleep = sleep
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > c.maxSleepTime {
c.maxSleepTime = 2 * c.sleep
}
}
}

// MaxSleepTime is used to config the max interval time of each execution.
func MaxSleepTime(maxSleepTime time.Duration) Option {
return func(c *config) {
// ensure max retry interval is always larger than retry interval
if c.sleep*2 > maxSleepTime {
c.maxSleepTime = 2 * c.sleep
} else {
c.maxSleepTime = maxSleepTime
}
}
}

func OnMerrCodes(codes ...int32) Option {
return func(c *config) {
c.merrCodes = append(c.merrCodes, codes...)
}
}

func contains(codes []int32, target int32) bool {
for _, c := range codes {
if c == target {
return true
}
}
return false
}

func RetryOnMilvusErrors(ctx context.Context, fn func() (interface{}, error), opts ...Option) (interface{}, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

c := newDefaultConfig()
for _, opt := range opts {
opt(c)
}

if len(c.merrCodes) == 0 {
return fn()
}

var lastResp interface{}
for i := uint(0); i < c.attempts; i++ {
resp, err := fn()
if err != nil {
return resp, err
}
var code int32
switch r := resp.(type) {
case interface{ GetStatus() *commonpb.Status }:
code = r.GetStatus().GetCode()
case interface{ GetCode() int32 }:
code = r.GetCode()
default:
return resp, nil
}
if code == 0 || !contains(c.merrCodes, code) {
return resp, nil
}

deadline, ok := ctx.Deadline()
if ok && time.Until(deadline) < c.sleep {
return resp, context.DeadlineExceeded
}

lastResp = resp

select {
case <-time.After(c.sleep):
case <-ctx.Done():
return lastResp, ctx.Err()
}

c.sleep *= 2
if c.sleep > c.maxSleepTime {
c.sleep = c.maxSleepTime
}
}
return lastResp, nil
}
180 changes: 180 additions & 0 deletions client/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)

func TestRetryOnMilvusErrors(t *testing.T) {
ctx := context.Background()

n := 0
testFn := func() (interface{}, error) {
if n < 3 {
n++
return &commonpb.Status{
Code: 0,
}, nil
}
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(0), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestOnNoCode(t *testing.T) {
ctx := context.Background()

n := 0
testFn := func() (interface{}, error) {
if n < 3 {
n++
return &commonpb.Status{
Code: 0,
}, nil
}
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn)
assert.Equal(t, int32(0), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestReturnErr(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
return nil, errors.New("mock err")
}

_, err := RetryOnMilvusErrors(ctx, testFn)
assert.Error(t, err)
t.Log(err)
}

func TestAttempts(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(1))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestMaxSleepTime(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(3), MaxSleepTime(200*time.Millisecond))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestSleep(t *testing.T) {
ctx := context.Background()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100), Attempts(3), Sleep(500*time.Millisecond))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.NoError(t, err)
t.Log(resp)
}

func TestContextDeadline(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.Error(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
t.Log(resp)
}

func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

testFn := func() (interface{}, error) {
t.Log("executed")
return &commonpb.Status{
Reason: "mock err",
Code: 100,
}, nil
}

go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

resp, err := RetryOnMilvusErrors(ctx, testFn, OnMerrCodes(100))
assert.Equal(t, int32(100), resp.(interface{ GetCode() int32 }).GetCode())
assert.Error(t, err)
assert.ErrorIs(t, err, context.Canceled)
t.Log(resp)
t.Log(err)
}
Loading

0 comments on commit f798a4b

Please sign in to comment.