diff --git a/internal/distributed/proxy/httpserver/handler_v1.go b/internal/distributed/proxy/httpserver/handler_v1.go index c68544482253e..6f3af0e37de60 100644 --- a/internal/distributed/proxy/httpserver/handler_v1.go +++ b/internal/distributed/proxy/httpserver/handler_v1.go @@ -454,6 +454,13 @@ func (h *HandlersV1) query(c *gin.Context) { if !h.checkDatabase(ctx, c, req.DbName) { return } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } response, err := h.proxy.Query(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) @@ -523,6 +530,13 @@ func (h *HandlersV1) get(c *gin.Context) { }) return } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } req.Expr = filter response, err := h.proxy.Query(ctx, &req) if err == nil { @@ -594,6 +608,13 @@ func (h *HandlersV1) delete(c *gin.Context) { } req.Expr = filter } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } response, err := h.proxy.Delete(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) @@ -669,6 +690,14 @@ func (h *HandlersV1) insert(c *gin.Context) { }) return } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + log.Warn("high level restful api, fail to insert for limiting", zap.Error(err)) + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } response, err := h.proxy.Insert(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) @@ -765,6 +794,13 @@ func (h *HandlersV1) upsert(c *gin.Context) { }) return } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } response, err := h.proxy.Upsert(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) @@ -859,6 +895,13 @@ func (h *HandlersV1) search(c *gin.Context) { if !h.checkDatabase(ctx, c, req.DbName) { return } + if _, err := CheckLimiter(&req, h.proxy); err != nil { + c.AbortWithStatusJSON(http.StatusOK, gin.H{ + HTTPReturnCode: merr.Code(merr.ErrHTTPRateLimit), + HTTPReturnMessage: merr.ErrHTTPRateLimit.Error() + ", error: " + err.Error(), + }) + return + } response, err := h.proxy.Search(ctx, &req) if err == nil { err = merr.Error(response.GetStatus()) diff --git a/internal/distributed/proxy/httpserver/handler_v1_test.go b/internal/distributed/proxy/httpserver/handler_v1_test.go index b854c1e2c4377..85524e1fa16d8 100644 --- a/internal/distributed/proxy/httpserver/handler_v1_test.go +++ b/internal/distributed/proxy/httpserver/handler_v1_test.go @@ -506,6 +506,9 @@ func TestQuery(t *testing.T) { exceptCode: 200, expectedBody: "{\"code\":200,\"data\":[{\"book_id\":1,\"book_intro\":[0.1,0.11],\"word_count\":1000},{\"book_id\":2,\"book_intro\":[0.2,0.22],\"word_count\":2000},{\"book_id\":3,\"book_intro\":[0.3,0.33],\"word_count\":3000}]}", }) + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, tt := range testCases { reqs := []*http.Request{genQueryRequest(), genGetRequest()} @@ -590,7 +593,9 @@ func TestDelete(t *testing.T) { exceptCode: 200, expectedBody: "{\"code\":200,\"data\":{}}", }) - + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { testEngine := initHTTPServer(tt.mp, true) @@ -614,11 +619,15 @@ func TestDelete(t *testing.T) { } func TestDeleteForFilter(t *testing.T) { + paramtable.Init() jsonBodyList := [][]byte{ []byte(`{"collectionName": "` + DefaultCollectionName + `" , "id": [1,2,3]}`), []byte(`{"collectionName": "` + DefaultCollectionName + `" , "filter": "id in [1,2,3]"}`), []byte(`{"collectionName": "` + DefaultCollectionName + `" , "id": [1,2,3], "filter": "id in [1,2,3]"}`), } + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, jsonBody := range jsonBodyList { t.Run("delete success", func(t *testing.T) { mp := mocks.NewMockProxy(t) @@ -716,6 +725,9 @@ func TestInsert(t *testing.T) { HTTPCollectionName: DefaultCollectionName, HTTPReturnData: rows[0], }) + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { testEngine := initHTTPServer(tt.mp, true) @@ -761,6 +773,9 @@ func TestInsertForDataType(t *testing.T) { "[success]with dynamic field": withDynamicField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64))), "[success]with array fields": withArrayField(newCollectionSchema(generateCollectionSchema(schemapb.DataType_Int64))), } + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for name, schema := range schemas { t.Run(name, func(t *testing.T) { mp := mocks.NewMockProxy(t) @@ -828,6 +843,9 @@ func TestReturnInt64(t *testing.T) { schemapb.DataType_Int64: "1,2,3", schemapb.DataType_VarChar: "\"1\",\"2\",\"3\"", } + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, dataType := range schemas { t.Run("[insert]httpCfg.allow: false", func(t *testing.T) { schema := newCollectionSchema(generateCollectionSchema(dataType)) @@ -1157,6 +1175,9 @@ func TestUpsert(t *testing.T) { HTTPCollectionName: DefaultCollectionName, HTTPReturnData: rows[0], }) + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { testEngine := initHTTPServer(tt.mp, true) @@ -1255,6 +1276,9 @@ func TestSearch(t *testing.T) { exceptCode: 200, expectedBody: "{\"code\":200,\"data\":[{\"book_id\":1,\"book_intro\":[0.1,0.11],\"distance\":0.01,\"word_count\":1000},{\"book_id\":2,\"book_intro\":[0.2,0.22],\"distance\":0.04,\"word_count\":2000},{\"book_id\":3,\"book_intro\":[0.3,0.33],\"distance\":0.09,\"word_count\":3000}]}", }) + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/distributed/proxy/httpserver/handler_v2.go b/internal/distributed/proxy/httpserver/handler_v2.go index 913953a15a2f3..609c3b3cce156 100644 --- a/internal/distributed/proxy/httpserver/handler_v2.go +++ b/internal/distributed/proxy/httpserver/handler_v2.go @@ -566,6 +566,10 @@ func (h *HandlersV2) query(ctx context.Context, c *gin.Context, anyReq any, dbNa req.QueryParams = append(req.QueryParams, &commonpb.KeyValuePair{Key: ParamLimit, Value: strconv.FormatInt(int64(httpReq.Limit), 10)}) } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest)) }) if err == nil { @@ -609,6 +613,10 @@ func (h *HandlersV2) get(ctx context.Context, c *gin.Context, anyReq any, dbName Expr: filter, } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Query", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Query(reqCtx, req.(*milvuspb.QueryRequest)) }) if err == nil { @@ -653,6 +661,10 @@ func (h *HandlersV2) delete(ctx context.Context, c *gin.Context, anyReq any, dbN req.Expr = filter } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Delete", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Delete(reqCtx, req.(*milvuspb.DeleteRequest)) }) if err == nil { @@ -694,6 +706,10 @@ func (h *HandlersV2) insert(ctx context.Context, c *gin.Context, anyReq any, dbN return nil, err } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Insert", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Insert(reqCtx, req.(*milvuspb.InsertRequest)) }) if err == nil { @@ -756,6 +772,10 @@ func (h *HandlersV2) upsert(ctx context.Context, c *gin.Context, anyReq any, dbN return nil, err } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Upsert", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Upsert(reqCtx, req.(*milvuspb.UpsertRequest)) }) if err == nil { @@ -882,6 +902,10 @@ func (h *HandlersV2) search(ctx context.Context, c *gin.Context, anyReq any, dbN GuaranteeTimestamp: BoundedTimestamp, } resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/Search", func(reqCtx context.Context, req any) (interface{}, error) { + resp, err := CheckLimiter(req, h.proxy) + if err != nil { + return resp, err + } return h.proxy.Search(reqCtx, req.(*milvuspb.SearchRequest)) }) if err == nil { diff --git a/internal/distributed/proxy/httpserver/handler_v2_test.go b/internal/distributed/proxy/httpserver/handler_v2_test.go index 5cd092339cb99..165dce455d2b2 100644 --- a/internal/distributed/proxy/httpserver/handler_v2_test.go +++ b/internal/distributed/proxy/httpserver/handler_v2_test.go @@ -53,6 +53,7 @@ func (DefaultReq) GetBase() *commonpb.MsgBase { func (req *DefaultReq) GetDbName() string { return req.DbName } func TestHTTPWrapper(t *testing.T) { + paramtable.Init() postTestCases := []requestBodyTestCase{} postTestCasesTrace := []requestBodyTestCase{} ginHandler := gin.Default() @@ -1181,6 +1182,9 @@ func TestDML(t *testing.T) { errCode: 65535, }) + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") for _, testcase := range queryTestCases { t.Run("query", func(t *testing.T) { bodyReader := bytes.NewReader(testcase.requestBody) @@ -1228,6 +1232,11 @@ func TestSearchV2(t *testing.T) { Status: &StatusSuccess, }, nil).Times(4) mp.EXPECT().Search(mock.Anything, mock.Anything).Return(&milvuspb.SearchResults{Status: commonSuccessStatus, Results: &schemapb.SearchResultData{TopK: int64(0)}}, nil).Twice() + + // disable rate limit + paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "false") + defer paramtable.Get().Save(paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.Key, "true") + testEngine := initHTTPServerV2(mp, false) queryTestCases := []requestBodyTestCase{} queryTestCases = append(queryTestCases, requestBodyTestCase{ diff --git a/internal/distributed/proxy/httpserver/utils.go b/internal/distributed/proxy/httpserver/utils.go index fe5f9eeceddf3..5adc833e17912 100644 --- a/internal/distributed/proxy/httpserver/utils.go +++ b/internal/distributed/proxy/httpserver/utils.go @@ -20,14 +20,47 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proxy" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/parameterutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) +func CheckLimiter(req interface{}, pxy types.ProxyComponent) (any, error) { + if !paramtable.Get().QuotaConfig.QuotaAndLimitsEnabled.GetAsBool() { + return nil, nil + } + // apply limiter for http/http2 server + limiter, err := pxy.GetRateLimiter() + if err != nil { + log.Error("Get proxy rate limiter for httpV1/V2 server failed", zap.Error(err)) + return nil, err + } + + collectionIDs, rt, n, err := proxy.GetRequestInfo(req) + if err != nil { + return nil, err + } + err = limiter.Check(collectionIDs, rt, n) + nodeID := strconv.FormatInt(paramtable.GetNodeID(), 10) + metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.TotalLabel).Inc() + if err != nil { + metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.FailLabel).Inc() + rsp := proxy.GetFailedResponse(req, err) + if rsp != nil { + return rsp, err + } + } + metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.SuccessLabel).Inc() + return nil, nil +} + func ParseUsernamePassword(c *gin.Context) (string, string, bool) { username, password, ok := c.Request.BasicAuth() if !ok { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index f216078e87887..865d59c32c049 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -358,7 +358,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.String("consistency_level", request.ConsistencyLevel.String()), ) - log.Debug(rpcReceived(method)) + log.Info(rpcReceived(method)) if err := node.sched.ddQueue.Enqueue(cct); err != nil { log.Warn( @@ -369,7 +369,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat return merr.Status(err), nil } - log.Debug( + log.Info( rpcEnqueued(method), zap.Uint64("BeginTs", cct.BeginTs()), zap.Uint64("EndTs", cct.EndTs()), @@ -387,7 +387,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat return merr.Status(err), nil } - log.Debug( + log.Info( rpcDone(method), zap.Uint64("BeginTs", cct.BeginTs()), zap.Uint64("EndTs", cct.EndTs()), diff --git a/internal/proxy/rate_limit_interceptor.go b/internal/proxy/rate_limit_interceptor.go index 15a019286ebc0..87cccb47c3873 100644 --- a/internal/proxy/rate_limit_interceptor.go +++ b/internal/proxy/rate_limit_interceptor.go @@ -36,7 +36,7 @@ import ( // RateLimitInterceptor returns a new unary server interceptors that performs request rate limiting. func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - collectionIDs, rt, n, err := getRequestInfo(req) + collectionIDs, rt, n, err := GetRequestInfo(req) if err != nil { return handler(ctx, req) } @@ -46,7 +46,7 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.TotalLabel).Inc() if err != nil { metrics.ProxyRateLimitReqCount.WithLabelValues(nodeID, rt.String(), metrics.FailLabel).Inc() - rsp := getFailedResponse(req, err) + rsp := GetFailedResponse(req, err) if rsp != nil { return rsp, nil } @@ -56,8 +56,8 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor { } } -// getRequestInfo returns collection name and rateType of request and return tokens needed. -func getRequestInfo(req interface{}) ([]int64, internalpb.RateType, int, error) { +// GetRequestInfo returns collection name and rateType of request and return tokens needed. +func GetRequestInfo(req interface{}) ([]int64, internalpb.RateType, int, error) { switch r := req.(type) { case *milvuspb.InsertRequest: collectionID, _ := globalMetaCache.GetCollectionID(context.TODO(), r.GetDbName(), r.GetCollectionName()) @@ -132,8 +132,8 @@ func failedMutationResult(err error) *milvuspb.MutationResult { } } -// getFailedResponse returns failed response. -func getFailedResponse(req any, err error) any { +// GetFailedResponse returns failed response. +func GetFailedResponse(req any, err error) any { switch req.(type) { case *milvuspb.InsertRequest, *milvuspb.DeleteRequest, *milvuspb.UpsertRequest: return failedMutationResult(err) diff --git a/internal/proxy/rate_limit_interceptor_test.go b/internal/proxy/rate_limit_interceptor_test.go index 06c1ee1500c3a..1ec1c912f9f20 100644 --- a/internal/proxy/rate_limit_interceptor_test.go +++ b/internal/proxy/rate_limit_interceptor_test.go @@ -53,7 +53,7 @@ func (l *limiterMock) Alloc(ctx context.Context, collection []int64, rt internal } func TestRateLimitInterceptor(t *testing.T) { - t.Run("test getRequestInfo", func(t *testing.T) { + t.Run("test GetRequestInfo", func(t *testing.T) { mockCache := NewMockCache(t) mockCache.On("GetCollectionID", mock.Anything, // context.Context @@ -61,118 +61,118 @@ func TestRateLimitInterceptor(t *testing.T) { mock.AnythingOfType("string"), ).Return(int64(0), nil) globalMetaCache = mockCache - collection, rt, size, err := getRequestInfo(&milvuspb.InsertRequest{}) + collection, rt, size, err := GetRequestInfo(&milvuspb.InsertRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.InsertRequest{}), size) assert.Equal(t, internalpb.RateType_DMLInsert, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.UpsertRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.UpsertRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.InsertRequest{}), size) assert.Equal(t, internalpb.RateType_DMLUpsert, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.DeleteRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.DeleteRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.DeleteRequest{}), size) assert.Equal(t, internalpb.RateType_DMLDelete, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.ImportRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.ImportRequest{}) assert.NoError(t, err) assert.Equal(t, proto.Size(&milvuspb.ImportRequest{}), size) assert.Equal(t, internalpb.RateType_DMLBulkLoad, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.SearchRequest{Nq: 5}) + collection, rt, size, err = GetRequestInfo(&milvuspb.SearchRequest{Nq: 5}) assert.NoError(t, err) assert.Equal(t, 5, size) assert.Equal(t, internalpb.RateType_DQLSearch, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.QueryRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.QueryRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DQLQuery, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.CreateCollectionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.CreateCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.LoadCollectionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.LoadCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.ReleaseCollectionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.ReleaseCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.DropCollectionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.DropCollectionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCollection, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.CreatePartitionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.CreatePartitionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.LoadPartitionsRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.LoadPartitionsRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.ReleasePartitionsRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.ReleasePartitionsRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.DropPartitionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.DropPartitionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLPartition, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.CreateIndexRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.CreateIndexRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLIndex, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.DropIndexRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.DropIndexRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLIndex, rt) assert.ElementsMatch(t, collection, []int64{int64(0)}) - collection, rt, size, err = getRequestInfo(&milvuspb.FlushRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.FlushRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLFlush, rt) assert.Len(t, collection, 0) - collection, rt, size, err = getRequestInfo(&milvuspb.ManualCompactionRequest{}) + collection, rt, size, err = GetRequestInfo(&milvuspb.ManualCompactionRequest{}) assert.NoError(t, err) assert.Equal(t, 1, size) assert.Equal(t, internalpb.RateType_DDLCompaction, rt) assert.Len(t, collection, 0) }) - t.Run("test getFailedResponse", func(t *testing.T) { + t.Run("test GetFailedResponse", func(t *testing.T) { testGetFailedResponse := func(req interface{}, rt internalpb.RateType, err error, fullMethod string) { - rsp := getFailedResponse(req, err) + rsp := GetFailedResponse(req, err) assert.NotNil(t, rsp) } @@ -186,9 +186,9 @@ func TestRateLimitInterceptor(t *testing.T) { testGetFailedResponse(&milvuspb.ManualCompactionRequest{}, internalpb.RateType_DDLCompaction, merr.ErrServiceRateLimit, "compaction") // test illegal - rsp := getFailedResponse(&milvuspb.SearchResults{}, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) + rsp := GetFailedResponse(&milvuspb.SearchResults{}, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) assert.Nil(t, rsp) - rsp = getFailedResponse(nil, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) + rsp = GetFailedResponse(nil, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError)) assert.Nil(t, rsp) }) diff --git a/internal/proxy/shard_client.go b/internal/proxy/shard_client.go index c250de1d6aab4..09a44620e52a8 100644 --- a/internal/proxy/shard_client.go +++ b/internal/proxy/shard_client.go @@ -122,7 +122,7 @@ func defaultQueryNodeClientCreator(ctx context.Context, addr string, nodeID int6 return registry.GetInMemoryResolver().ResolveQueryNode(ctx, addr, nodeID) } -// NewShardClientMgr creates a new shardClientMgr +// newShardClientMgr creates a new shardClientMgr func newShardClientMgr(options ...shardClientMgrOpt) *shardClientMgrImpl { s := &shardClientMgrImpl{ clients: struct { diff --git a/internal/rootcoord/garbage_collector.go b/internal/rootcoord/garbage_collector.go index 523191da2274f..1465867b0cb64 100644 --- a/internal/rootcoord/garbage_collector.go +++ b/internal/rootcoord/garbage_collector.go @@ -19,9 +19,12 @@ package rootcoord import ( "context" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/pkg/log" ms "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" ) @@ -190,7 +193,8 @@ func (c *bgGarbageCollector) notifyCollectionGc(ctx context.Context, coll *model if err := c.s.chanTimeTick.broadcastDmlChannels(coll.PhysicalChannelNames, &msgPack); err != nil { return 0, err } - + log.Info("Sent dropMsg into dml channel success", zap.Int64("colID", coll.CollectionID), + zap.Strings("channels", coll.PhysicalChannelNames), zap.Uint64("ts", ts)) return ts, nil } diff --git a/internal/rootcoord/step.go b/internal/rootcoord/step.go index 5a51996a6c05c..225dbc5d94056 100644 --- a/internal/rootcoord/step.go +++ b/internal/rootcoord/step.go @@ -21,9 +21,12 @@ import ( "fmt" "time" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/log" ) type stepPriority int @@ -232,6 +235,7 @@ func (s *waitForTsSyncedStep) Execute(ctx context.Context) ([]nestedStep, error) // time.Sleep(Params.ProxyCfg.TimeTickInterval) return nil, fmt.Errorf("ts not synced yet, channel: %s, synced: %d, want: %d", s.channel, syncedTs, s.ts) } + log.Info("target ts has been synced into mq", zap.String("channel", s.channel), zap.Uint64("ts", s.ts)) return nil, nil } diff --git a/internal/rootcoord/step_executor.go b/internal/rootcoord/step_executor.go index f28b51d2ad5e8..4679016f990a3 100644 --- a/internal/rootcoord/step_executor.go +++ b/internal/rootcoord/step_executor.go @@ -66,6 +66,8 @@ func (s *stepStack) Execute(ctx context.Context) *stepStack { if !retry.IsRecoverable(err) { if !skipLog { log.Warn("failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc())) + } else { + log.RatedWarn(60, "failed to execute step, not able to reschedule", zap.Error(err), zap.String("step", todo.Desc())) } return nil } @@ -73,6 +75,8 @@ func (s *stepStack) Execute(ctx context.Context) *stepStack { s.steps = nil // let s can be collected. if !skipLog { log.Warn("failed to execute step, wait for reschedule", zap.Error(err), zap.String("step", todo.Desc())) + } else { + log.RatedWarn(60, "failed to execute step, wait for reschedule", zap.Error(err), zap.String("step", todo.Desc())) } return &stepStack{steps: steps} } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index dc81e9f6b2809..2d0f737580ab3 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -137,6 +137,7 @@ var ( ErrInvalidInsertData = newMilvusError("fail to deal the insert data", 1804, false) ErrInvalidSearchResult = newMilvusError("fail to parse search result", 1805, false) ErrCheckPrimaryKey = newMilvusError("please check the primary key and its' type can only in [int, string]", 1806, false) + ErrHTTPRateLimit = newMilvusError("request is rejected by limiter", 1807, true) // replicate related ErrDenyReplicateMessage = newMilvusError("deny to use the replicate message in the normal instance", 1900, false)