From 1b4e28b97f693ef13a2eb13d55e7e96ee1285883 Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Thu, 23 May 2024 20:03:40 +0800 Subject: [PATCH] enhance: Check by proxy rate limiter when delete get data by query. (#30891) relate: https://github.com/milvus-io/milvus/issues/30927 --------- Signed-off-by: aoiasd --- configs/milvus.yaml | 13 +-- internal/proxy/impl.go | 7 ++ internal/proxy/impl_test.go | 23 +++--- internal/proxy/proxy.go | 6 +- internal/proxy/proxy_test.go | 2 +- internal/proxy/rate_limit_interceptor_test.go | 4 + internal/proxy/simple_rate_limiter.go | 17 +++- internal/proxy/simple_rate_limiter_test.go | 18 ++--- internal/proxy/task_delete.go | 22 ++++- internal/proxy/task_delete_test.go | 80 +++++++++++++++++++ internal/types/types.go | 1 + pkg/util/paramtable/quota_param.go | 30 +++++++ 12 files changed, 192 insertions(+), 31 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 5410ecce75b15..b526ad733703e 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -631,6 +631,14 @@ quotaAndLimits: # collects metrics from Proxies, Query cluster and Data cluster. # seconds, (0 ~ 65536) quotaCenterCollectInterval: 3 + limits: + allocRetryTimes: 15 # retry times when delete alloc forward data from rate limit failed + allocWaitInterval: 1000 # retry wait duration when delete alloc forward data rate failed, in millisecond + complexDeleteLimitEnable: false # whether complex delete check forward data by limiter + maxCollectionNum: 65536 + maxCollectionNumPerDB: 65536 + maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit + maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes ddl: enabled: false collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection @@ -711,11 +719,6 @@ quotaAndLimits: max: -1 # qps, default no limit partition: max: -1 # qps, default no limit - limits: - maxCollectionNum: 65536 - maxCollectionNumPerDB: 65536 - maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit - maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes limitWriting: # forceDeny false means dml requests are allowed (except for some # specific conditions, such as memory of nodes to water marker), true means always reject all dml requests. diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 3947bd510aa95..340b21fa33300 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proxy/connection" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/hookutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" @@ -2642,6 +2643,11 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc() + var limiter types.Limiter + if node.enableComplexDeleteLimit { + limiter, _ = node.GetRateLimiter() + } + dr := &deleteRunner{ req: request, idAllocator: node.rowIDAllocator, @@ -2650,6 +2656,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) chTicker: node.chTicker, queue: node.sched.dmQueue, lb: node.lbPolicy, + limiter: limiter, } log.Debug("init delete runner in Proxy") diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index 9577ee9b6d80d..883e44136d778 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -80,7 +80,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) { func TestProxy_CheckHealth(t *testing.T) { t.Run("not healthy", func(t *testing.T) { node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Abnormal) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -98,7 +98,7 @@ func TestProxy_CheckHealth(t *testing.T) { dataCoord: NewDataCoordMock(), session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -131,7 +131,7 @@ func TestProxy_CheckHealth(t *testing.T) { queryCoord: qc, dataCoord: dataCoordMock, } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) ctx := context.Background() resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -148,7 +148,7 @@ func TestProxy_CheckHealth(t *testing.T) { dataCoord: NewDataCoordMock(), queryCoord: qc, } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -243,7 +243,7 @@ func TestProxy_ResourceGroup(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) qc := mocks.NewMockQueryCoordClient(t) @@ -335,7 +335,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { node, err := NewProxy(ctx, factory) assert.NoError(t, err) - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) qc := mocks.NewMockQueryCoordClient(t) @@ -936,7 +936,7 @@ func TestProxyCreateDatabase(t *testing.T) { node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) @@ -996,7 +996,7 @@ func TestProxyDropDatabase(t *testing.T) { node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) @@ -1055,7 +1055,7 @@ func TestProxyListDatabase(t *testing.T) { node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) @@ -1111,7 +1111,7 @@ func TestProxyAlterDatabase(t *testing.T) { node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) @@ -1164,7 +1164,7 @@ func TestProxyDescribeDatabase(t *testing.T) { node.tsoAllocator = ×tampAllocator{ tso: newMockTimestampAllocatorInterface(), } - node.simpleLimiter = NewSimpleLimiter() + node.simpleLimiter = NewSimpleLimiter(0, 0) node.UpdateStateCode(commonpb.StateCode_Healthy) node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory) node.sched.ddQueue.setMaxTaskNum(10) @@ -1287,6 +1287,7 @@ func TestProxy_Delete(t *testing.T) { Expr: "pk in [1, 2, 3]", } cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 22d3dfbb9bcd8..c0af10850aaa8 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -128,6 +128,9 @@ type Proxy struct { // materialized view enableMaterializedView bool + + // delete rate limiter + enableComplexDeleteLimit bool } // NewProxy returns a Proxy struct. @@ -146,7 +149,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) { factory: factory, searchResultCh: make(chan *internalpb.SearchResults, n), shardMgr: mgr, - simpleLimiter: NewSimpleLimiter(), + simpleLimiter: NewSimpleLimiter(Params.QuotaConfig.AllocWaitInterval.GetAsDuration(time.Millisecond), Params.QuotaConfig.AllocRetryTimes.GetAsUint()), lbPolicy: lbPolicy, resourceManager: resourceManager, replicateStreamManager: replicateStreamManager, @@ -287,6 +290,7 @@ func (node *Proxy) Init() error { node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator) log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval)) + node.enableComplexDeleteLimit = Params.QuotaConfig.ComplexDeleteLimitEnable.GetAsBool() node.metricsCacheManager = metricsinfo.NewMetricsCacheManager() log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole)) diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 9877d1243e3ac..298abede0c7da 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -299,7 +299,7 @@ func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup, p * ctx, cancel := context.WithCancel(ctx) defer cancel() - s.simpleLimiter = NewSimpleLimiter() + s.simpleLimiter = NewSimpleLimiter(0, 0) opts := tracer.GetInterceptorOpts() s.grpcServer = grpc.NewServer( diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 5440123da630a..53db9ede78c99 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -50,6 +50,10 @@ func (l *limiterMock) Check(dbID int64, collectionIDToPartIDs map[int64][]int64, return nil } +func (l *limiterMock) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error { + return l.Check(dbID, collectionIDToPartIDs, rt, n) +} + func TestRateLimitInterceptor(t *testing.T) { t.Run("test getRequestInfo", func(t *testing.T) { mockCache := NewMockCache(t) diff --git a/internal/proxy/simple_rate_limiter.go b/internal/proxy/simple_rate_limiter.go index 0a6b721c46b94..e5979242064fb 100644 --- a/internal/proxy/simple_rate_limiter.go +++ b/internal/proxy/simple_rate_limiter.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "sync" + "time" "go.uber.org/zap" @@ -35,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -42,15 +44,26 @@ import ( type SimpleLimiter struct { quotaStatesMu sync.RWMutex rateLimiter *rlinternal.RateLimiterTree + + // for alloc + allocWaitInterval time.Duration + allocRetryTimes uint } // NewSimpleLimiter returns a new SimpleLimiter. -func NewSimpleLimiter() *SimpleLimiter { +func NewSimpleLimiter(allocWaitInterval time.Duration, allocRetryTimes uint) *SimpleLimiter { rootRateLimiter := newClusterLimiter() - m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter)} + m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter), allocWaitInterval: allocWaitInterval, allocRetryTimes: allocRetryTimes} return m } +// Alloc will retry till check pass or out of times. +func (m *SimpleLimiter) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error { + return retry.Do(ctx, func() error { + return m.Check(dbID, collectionIDToPartIDs, rt, n) + }, retry.Sleep(m.allocWaitInterval), retry.Attempts(m.allocRetryTimes)) +} + // Check checks if request would be limited or denied. func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error { if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { diff --git a/internal/proxy/simple_rate_limiter_test.go b/internal/proxy/simple_rate_limiter_test.go index c19253c3dbc40..d9f555b4a87ce 100644 --- a/internal/proxy/simple_rate_limiter_test.go +++ b/internal/proxy/simple_rate_limiter_test.go @@ -40,7 +40,7 @@ func TestSimpleRateLimiter(t *testing.T) { bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters() simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, collectionID, newDatabaseLimiter, @@ -83,7 +83,7 @@ func TestSimpleRateLimiter(t *testing.T) { t.Run("test global static limit", func(t *testing.T) { bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters() collectionIDToPartIDs := map[int64][]int64{ @@ -136,7 +136,7 @@ func TestSimpleRateLimiter(t *testing.T) { }) t.Run("not enable quotaAndLimit", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false") for _, rt := range internalpb.RateType_value { @@ -150,7 +150,7 @@ func TestSimpleRateLimiter(t *testing.T) { run := func(insertRate float64) { bakInsertRate := Params.QuotaConfig.DMLMaxInsertRate.GetValue() paramtable.Get().Save(Params.QuotaConfig.DMLMaxInsertRate.Key, fmt.Sprintf("%f", insertRate)) - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue() paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true") err := simpleLimiter.Check(0, nil, internalpb.RateType_DMLInsert, 1*1024*1024) @@ -166,7 +166,7 @@ func TestSimpleRateLimiter(t *testing.T) { }) t.Run("test set rates", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) zeroRates := getZeroCollectionRates() err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{ @@ -188,7 +188,7 @@ func TestSimpleRateLimiter(t *testing.T) { }) t.Run("test quota states", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{ 1: { // collection limiter @@ -257,7 +257,7 @@ func newCollectionLimiterNode(collectionLimiterNodes map[int64]*proxypb.LimiterN func TestRateLimiter(t *testing.T) { t.Run("test limit", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) rootLimiters := simpleLimiter.rateLimiter.GetRootLimiters() for _, rt := range internalpb.RateType_value { rootLimiters.GetLimiters().Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1)) @@ -273,7 +273,7 @@ func TestRateLimiter(t *testing.T) { }) t.Run("test setRates", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter, func() *rlinternal.RateLimiterNode { @@ -336,7 +336,7 @@ func TestRateLimiter(t *testing.T) { }) t.Run("test get error code", func(t *testing.T) { - simpleLimiter := NewSimpleLimiter() + simpleLimiter := NewSimpleLimiter(0, 0) collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter, func() *rlinternal.RateLimiterNode { diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 8822410576dd2..7beaffadcbbea 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -231,9 +231,11 @@ type deleteRunner struct { idAllocator allocator.Interface tsoAllocatorIns tsoAllocator + limiter types.Limiter // delete info schema *schemaInfo + dbID UniqueID collectionID UniqueID partitionID UniqueID partitionKeyMode bool @@ -259,6 +261,13 @@ func (dr *deleteRunner) Init(ctx context.Context) error { if err := validateCollectionName(collName); err != nil { return ErrWithLog(log, "Invalid collection name", err) } + + db, err := globalMetaCache.GetDatabaseInfo(ctx, dr.req.GetDbName()) + if err != nil { + return err + } + dr.dbID = db.dbID + dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName) if err != nil { return ErrWithLog(log, "Failed to get collection id", err) @@ -428,7 +437,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } taskCh := make(chan *deleteTask, 256) - go dr.receiveQueryResult(ctx, client, taskCh) + go dr.receiveQueryResult(ctx, client, taskCh, partitionIDs) var allQueryCnt int64 // wait all task finish for task := range taskCh { @@ -449,7 +458,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } } -func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) { +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) { defer func() { close(taskCh) }() @@ -472,6 +481,15 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q return } + if dr.limiter != nil { + err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds())) + if err != nil { + dr.err = err + log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err)) + return + } + } + task, err := dr.produce(ctx, result.GetIds()) if err != nil { dr.err = err diff --git a/internal/proxy/task_delete_test.go b/internal/proxy/task_delete_test.go index 4c973b803c476..657029001952f 100644 --- a/internal/proxy/task_delete_test.go +++ b/internal/proxy/task_delete_test.go @@ -118,6 +118,7 @@ func TestDeleteTask_GetChannels(t *testing.T) { mock.AnythingOfType("string"), mock.AnythingOfType("string"), ).Return(collectionID, nil) + globalMetaCache = cache chMgr := NewMockChannelsMgr(t) chMgr.EXPECT().getChannels(mock.Anything).Return(channels, nil) @@ -265,6 +266,19 @@ func TestDeleteRunner_Init(t *testing.T) { assert.Error(t, dr.Init(context.Background())) }) + t.Run("fail to get database info", func(t *testing.T) { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + }, + } + cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) + globalMetaCache = cache + + assert.Error(t, dr.Init(context.Background())) + }) + t.Run("fail to get collection id", func(t *testing.T) { dr := deleteRunner{ req: &milvuspb.DeleteRequest{ @@ -272,11 +286,13 @@ func TestDeleteRunner_Init(t *testing.T) { }, } cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), mock.AnythingOfType("string"), ).Return(int64(0), errors.New("mock GetCollectionID err")) + globalMetaCache = cache assert.Error(t, dr.Init(context.Background())) }) @@ -287,6 +303,7 @@ func TestDeleteRunner_Init(t *testing.T) { DbName: dbName, }} cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), @@ -309,6 +326,7 @@ func TestDeleteRunner_Init(t *testing.T) { PartitionName: partitionName, }} cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), @@ -347,6 +365,7 @@ func TestDeleteRunner_Init(t *testing.T) { }, } cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), @@ -372,6 +391,7 @@ func TestDeleteRunner_Init(t *testing.T) { }, } cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), @@ -405,6 +425,7 @@ func TestDeleteRunner_Init(t *testing.T) { chMgr: chMgr, } cache := NewMockCache(t) + cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) cache.On("GetCollectionID", mock.Anything, // context.Context mock.AnythingOfType("string"), @@ -656,6 +677,65 @@ func TestDeleteRunner_Run(t *testing.T) { assert.Error(t, dr.Run(ctx)) }) + t.Run("complex delete rate limit check failed", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockMgr := NewMockChannelsMgr(t) + qn := mocks.NewMockQueryNodeClient(t) + lb := NewMockLBPolicy(t) + + dr := deleteRunner{ + chMgr: mockMgr, + queue: queue.dmQueue, + schema: schema, + collectionID: collectionID, + partitionID: partitionID, + vChannels: channels, + idAllocator: idAllocator, + tsoAllocatorIns: tsoAllocator, + lb: lb, + limiter: &limiterMock{}, + result: &milvuspb.MutationResult{ + Status: merr.Success(), + IDs: &schemapb.IDs{ + IdField: nil, + }, + }, + req: &milvuspb.DeleteRequest{ + CollectionName: collectionName, + PartitionName: partitionName, + DbName: dbName, + Expr: "pk < 3", + }, + } + lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { + return workload.exec(ctx, 1, qn, "") + }) + + qn.EXPECT().QueryStream(mock.Anything, mock.Anything).Call.Return( + func(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) querypb.QueryNode_QueryStreamClient { + client := streamrpc.NewLocalQueryClient(ctx) + server := client.CreateServer() + + server.Send(&internalpb.RetrieveResults{ + Status: merr.Success(), + Ids: &schemapb.IDs{ + IdField: &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: []int64{0, 1, 2}, + }, + }, + }, + }) + server.FinishSend(nil) + return client + }, nil) + + assert.Error(t, dr.Run(ctx)) + assert.Equal(t, int64(0), dr.result.DeleteCnt) + }) + t.Run("complex delete produce failed", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/types/types.go b/internal/types/types.go index 27acc7cac3d1b..93c85dc9e79ef 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -38,6 +38,7 @@ import ( // Otherwise, the request will pass. Limit also returns limit of limiter. type Limiter interface { Check(dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error + Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error } // Component is the interface all services implement diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index 33996a7b77dc7..3cc550e9e6eee 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -45,6 +45,9 @@ const ( type quotaConfig struct { QuotaAndLimitsEnabled ParamItem `refreshable:"false"` QuotaCenterCollectInterval ParamItem `refreshable:"false"` + AllocRetryTimes ParamItem `refreshable:"false"` + AllocWaitInterval ParamItem `refreshable:"false"` + ComplexDeleteLimitEnable ParamItem `refreshable:"false"` // ddl DDLLimitEnabled ParamItem `refreshable:"true"` @@ -2021,6 +2024,33 @@ MB/s, default no limit`, Export: true, } p.CoolOffSpeed.Init(base.mgr) + + p.AllocRetryTimes = ParamItem{ + Key: "quotaAndLimits.limits.allocRetryTimes", + Version: "2.4.0", + DefaultValue: "15", + Doc: `retry times when delete alloc forward data from rate limit failed`, + Export: true, + } + p.AllocRetryTimes.Init(base.mgr) + + p.AllocWaitInterval = ParamItem{ + Key: "quotaAndLimits.limits.allocWaitInterval", + Version: "2.4.0", + DefaultValue: "1000", + Doc: `retry wait duration when delete alloc forward data rate failed, in millisecond`, + Export: true, + } + p.AllocWaitInterval.Init(base.mgr) + + p.ComplexDeleteLimitEnable = ParamItem{ + Key: "quotaAndLimits.limits.complexDeleteLimitEnable", + Version: "2.4.0", + DefaultValue: "false", + Doc: `whether complex delete check forward data by limiter`, + Export: true, + } + p.ComplexDeleteLimitEnable.Init(base.mgr) } func megaBytes2Bytes(f float64) float64 {