Skip to content

Commit

Permalink
enhance: Check by proxy rate limiter when delete get data by query. (m…
Browse files Browse the repository at this point in the history
…ilvus-io#30891)

relate: milvus-io#30927

---------

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored May 23, 2024
1 parent c7be2ce commit 1b4e28b
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 31 deletions.
13 changes: 8 additions & 5 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,14 @@ quotaAndLimits:
# collects metrics from Proxies, Query cluster and Data cluster.
# seconds, (0 ~ 65536)
quotaCenterCollectInterval: 3
limits:
allocRetryTimes: 15 # retry times when delete alloc forward data from rate limit failed
allocWaitInterval: 1000 # retry wait duration when delete alloc forward data rate failed, in millisecond
complexDeleteLimitEnable: false # whether complex delete check forward data by limiter
maxCollectionNum: 65536
maxCollectionNumPerDB: 65536
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
ddl:
enabled: false
collectionRate: -1 # qps, default no limit, rate for CreateCollection, DropCollection, LoadCollection, ReleaseCollection
Expand Down Expand Up @@ -711,11 +719,6 @@ quotaAndLimits:
max: -1 # qps, default no limit
partition:
max: -1 # qps, default no limit
limits:
maxCollectionNum: 65536
maxCollectionNumPerDB: 65536
maxInsertSize: -1 # maximum size of a single insert request, in bytes, -1 means no limit
maxResourceGroupNumOfQueryNode: 1024 # maximum number of resource groups of query nodes
limitWriting:
# forceDeny false means dml requests are allowed (except for some
# specific conditions, such as memory of nodes to water marker), true means always reject all dml requests.
Expand Down
7 changes: 7 additions & 0 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proxy/connection"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -2642,6 +2643,11 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.TotalLabel, request.GetDbName(), request.GetCollectionName()).Inc()

var limiter types.Limiter
if node.enableComplexDeleteLimit {
limiter, _ = node.GetRateLimiter()
}

dr := &deleteRunner{
req: request,
idAllocator: node.rowIDAllocator,
Expand All @@ -2650,6 +2656,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
chTicker: node.chTicker,
queue: node.sched.dmQueue,
lb: node.lbPolicy,
limiter: limiter,
}

log.Debug("init delete runner in Proxy")
Expand Down
23 changes: 12 additions & 11 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestProxy_InvalidateCollectionMetaCache_remove_stream(t *testing.T) {
func TestProxy_CheckHealth(t *testing.T) {
t.Run("not healthy", func(t *testing.T) {
node := &Proxy{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Abnormal)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
Expand All @@ -98,7 +98,7 @@ func TestProxy_CheckHealth(t *testing.T) {
dataCoord: NewDataCoordMock(),
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestProxy_CheckHealth(t *testing.T) {
queryCoord: qc,
dataCoord: dataCoordMock,
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
ctx := context.Background()
resp, err := node.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
Expand All @@ -148,7 +148,7 @@ func TestProxy_CheckHealth(t *testing.T) {
dataCoord: NewDataCoordMock(),
queryCoord: qc,
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err := node.CheckHealth(context.Background(), &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestProxy_ResourceGroup(t *testing.T) {

node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)

qc := mocks.NewMockQueryCoordClient(t)
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) {

node, err := NewProxy(ctx, factory)
assert.NoError(t, err)
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)

qc := mocks.NewMockQueryCoordClient(t)
Expand Down Expand Up @@ -936,7 +936,7 @@ func TestProxyCreateDatabase(t *testing.T) {
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
Expand Down Expand Up @@ -996,7 +996,7 @@ func TestProxyDropDatabase(t *testing.T) {
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestProxyListDatabase(t *testing.T) {
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
Expand Down Expand Up @@ -1111,7 +1111,7 @@ func TestProxyAlterDatabase(t *testing.T) {
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
Expand Down Expand Up @@ -1164,7 +1164,7 @@ func TestProxyDescribeDatabase(t *testing.T) {
node.tsoAllocator = &timestampAllocator{
tso: newMockTimestampAllocatorInterface(),
}
node.simpleLimiter = NewSimpleLimiter()
node.simpleLimiter = NewSimpleLimiter(0, 0)
node.UpdateStateCode(commonpb.StateCode_Healthy)
node.sched, err = newTaskScheduler(ctx, node.tsoAllocator, node.factory)
node.sched.ddQueue.setMaxTaskNum(10)
Expand Down Expand Up @@ -1287,6 +1287,7 @@ func TestProxy_Delete(t *testing.T) {
Expr: "pk in [1, 2, 3]",
}
cache := NewMockCache(t)
cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil)
cache.On("GetCollectionID",
mock.Anything, // context.Context
mock.AnythingOfType("string"),
Expand Down
6 changes: 5 additions & 1 deletion internal/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type Proxy struct {

// materialized view
enableMaterializedView bool

// delete rate limiter
enableComplexDeleteLimit bool
}

// NewProxy returns a Proxy struct.
Expand All @@ -146,7 +149,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
factory: factory,
searchResultCh: make(chan *internalpb.SearchResults, n),
shardMgr: mgr,
simpleLimiter: NewSimpleLimiter(),
simpleLimiter: NewSimpleLimiter(Params.QuotaConfig.AllocWaitInterval.GetAsDuration(time.Millisecond), Params.QuotaConfig.AllocRetryTimes.GetAsUint()),
lbPolicy: lbPolicy,
resourceManager: resourceManager,
replicateStreamManager: replicateStreamManager,
Expand Down Expand Up @@ -287,6 +290,7 @@ func (node *Proxy) Init() error {
node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator)
log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval))

node.enableComplexDeleteLimit = Params.QuotaConfig.ComplexDeleteLimitEnable.GetAsBool()
node.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole))

Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (s *proxyTestServer) startGrpc(ctx context.Context, wg *sync.WaitGroup, p *
ctx, cancel := context.WithCancel(ctx)
defer cancel()

s.simpleLimiter = NewSimpleLimiter()
s.simpleLimiter = NewSimpleLimiter(0, 0)

opts := tracer.GetInterceptorOpts()
s.grpcServer = grpc.NewServer(
Expand Down
4 changes: 4 additions & 0 deletions internal/proxy/rate_limit_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func (l *limiterMock) Check(dbID int64, collectionIDToPartIDs map[int64][]int64,
return nil
}

func (l *limiterMock) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
return l.Check(dbID, collectionIDToPartIDs, rt, n)
}

func TestRateLimitInterceptor(t *testing.T) {
t.Run("test getRequestInfo", func(t *testing.T) {
mockCache := NewMockCache(t)
Expand Down
17 changes: 15 additions & 2 deletions internal/proxy/simple_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"sync"
"time"

"go.uber.org/zap"

Expand All @@ -35,22 +36,34 @@ import (
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// SimpleLimiter is implemented based on Limiter interface
type SimpleLimiter struct {
quotaStatesMu sync.RWMutex
rateLimiter *rlinternal.RateLimiterTree

// for alloc
allocWaitInterval time.Duration
allocRetryTimes uint
}

// NewSimpleLimiter returns a new SimpleLimiter.
func NewSimpleLimiter() *SimpleLimiter {
func NewSimpleLimiter(allocWaitInterval time.Duration, allocRetryTimes uint) *SimpleLimiter {
rootRateLimiter := newClusterLimiter()
m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter)}
m := &SimpleLimiter{rateLimiter: rlinternal.NewRateLimiterTree(rootRateLimiter), allocWaitInterval: allocWaitInterval, allocRetryTimes: allocRetryTimes}
return m
}

// Alloc will retry till check pass or out of times.
func (m *SimpleLimiter) Alloc(ctx context.Context, dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
return retry.Do(ctx, func() error {
return m.Check(dbID, collectionIDToPartIDs, rt, n)
}, retry.Sleep(m.allocWaitInterval), retry.Attempts(m.allocRetryTimes))
}

// Check checks if request would be limited or denied.
func (m *SimpleLimiter) Check(dbID int64, collectionIDToPartIDs map[int64][]int64, rt internalpb.RateType, n int) error {
if !Params.QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() {
Expand Down
18 changes: 9 additions & 9 deletions internal/proxy/simple_rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSimpleRateLimiter(t *testing.T) {
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")

simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters()

simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, collectionID, newDatabaseLimiter,
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestSimpleRateLimiter(t *testing.T) {
t.Run("test global static limit", func(t *testing.T) {
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
clusterRateLimiters := simpleLimiter.rateLimiter.GetRootLimiters()

collectionIDToPartIDs := map[int64][]int64{
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestSimpleRateLimiter(t *testing.T) {
})

t.Run("not enable quotaAndLimit", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "false")
for _, rt := range internalpb.RateType_value {
Expand All @@ -150,7 +150,7 @@ func TestSimpleRateLimiter(t *testing.T) {
run := func(insertRate float64) {
bakInsertRate := Params.QuotaConfig.DMLMaxInsertRate.GetValue()
paramtable.Get().Save(Params.QuotaConfig.DMLMaxInsertRate.Key, fmt.Sprintf("%f", insertRate))
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
bak := Params.QuotaConfig.QuotaAndLimitsEnabled.GetValue()
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
err := simpleLimiter.Check(0, nil, internalpb.RateType_DMLInsert, 1*1024*1024)
Expand All @@ -166,7 +166,7 @@ func TestSimpleRateLimiter(t *testing.T) {
})

t.Run("test set rates", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
zeroRates := getZeroCollectionRates()

err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{
Expand All @@ -188,7 +188,7 @@ func TestSimpleRateLimiter(t *testing.T) {
})

t.Run("test quota states", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
err := simpleLimiter.SetRates(newCollectionLimiterNode(map[int64]*proxypb.LimiterNode{
1: {
// collection limiter
Expand Down Expand Up @@ -257,7 +257,7 @@ func newCollectionLimiterNode(collectionLimiterNodes map[int64]*proxypb.LimiterN

func TestRateLimiter(t *testing.T) {
t.Run("test limit", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)
rootLimiters := simpleLimiter.rateLimiter.GetRootLimiters()
for _, rt := range internalpb.RateType_value {
rootLimiters.GetLimiters().Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
Expand All @@ -273,7 +273,7 @@ func TestRateLimiter(t *testing.T) {
})

t.Run("test setRates", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)

collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter,
func() *rlinternal.RateLimiterNode {
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestRateLimiter(t *testing.T) {
})

t.Run("test get error code", func(t *testing.T) {
simpleLimiter := NewSimpleLimiter()
simpleLimiter := NewSimpleLimiter(0, 0)

collectionRateLimiters := simpleLimiter.rateLimiter.GetOrCreateCollectionLimiters(0, int64(1), newDatabaseLimiter,
func() *rlinternal.RateLimiterNode {
Expand Down
22 changes: 20 additions & 2 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ type deleteRunner struct {

idAllocator allocator.Interface
tsoAllocatorIns tsoAllocator
limiter types.Limiter

// delete info
schema *schemaInfo
dbID UniqueID
collectionID UniqueID
partitionID UniqueID
partitionKeyMode bool
Expand All @@ -259,6 +261,13 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
if err := validateCollectionName(collName); err != nil {
return ErrWithLog(log, "Invalid collection name", err)
}

db, err := globalMetaCache.GetDatabaseInfo(ctx, dr.req.GetDbName())
if err != nil {
return err
}
dr.dbID = db.dbID

dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
if err != nil {
return ErrWithLog(log, "Failed to get collection id", err)
Expand Down Expand Up @@ -428,7 +437,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}

taskCh := make(chan *deleteTask, 256)
go dr.receiveQueryResult(ctx, client, taskCh)
go dr.receiveQueryResult(ctx, client, taskCh, partitionIDs)
var allQueryCnt int64
// wait all task finish
for task := range taskCh {
Expand All @@ -449,7 +458,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe
}
}

func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) {
func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) {
defer func() {
close(taskCh)
}()
Expand All @@ -472,6 +481,15 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q
return
}

if dr.limiter != nil {
err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds()))
if err != nil {
dr.err = err
log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err))
return
}
}

task, err := dr.produce(ctx, result.GetIds())
if err != nil {
dr.err = err
Expand Down
Loading

0 comments on commit 1b4e28b

Please sign in to comment.