From 5e90f348fcbba7da121eaaaee4b539f168ead186 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 11 Nov 2024 10:14:26 +0800 Subject: [PATCH] enhance: Handle legacy proxy load fields request (#37565) Related to #35415 In rolling upgrade, legacy proxy may dispatch load request wit empty load field list. The upgraded querycoord may report error by mistake that load field list is changed. This PR: - Auto field empty load field list with all user field ids - Refine the error messag when load field list updates - Refine load job unit test with service cases Signed-off-by: Congqi Xia --- internal/datanode/importv2/pool_test.go | 3 +- .../metastore/mocks/mock_rootcoord_catalog.go | 2 +- internal/querycoordv2/job/job_load.go | 20 +- internal/querycoordv2/job/job_test.go | 316 +++++++++++++++--- 4 files changed, 282 insertions(+), 59 deletions(-) diff --git a/internal/datanode/importv2/pool_test.go b/internal/datanode/importv2/pool_test.go index 06873c6d31ae5..4449a5031c812 100644 --- a/internal/datanode/importv2/pool_test.go +++ b/internal/datanode/importv2/pool_test.go @@ -20,9 +20,10 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/stretchr/testify/assert" ) func TestResizePools(t *testing.T) { diff --git a/internal/metastore/mocks/mock_rootcoord_catalog.go b/internal/metastore/mocks/mock_rootcoord_catalog.go index 646eb849ae756..8c35d288c1143 100644 --- a/internal/metastore/mocks/mock_rootcoord_catalog.go +++ b/internal/metastore/mocks/mock_rootcoord_catalog.go @@ -1879,7 +1879,7 @@ func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) Return(_a0 *milvuspb.Privileg return _c } -func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo,error)) *RootCoordCatalog_GetPrivilegeGroup_Call { +func (_c *RootCoordCatalog_GetPrivilegeGroup_Call) RunAndReturn(run func(context.Context, string) (*milvuspb.PrivilegeGroupInfo, error)) *RootCoordCatalog_GetPrivilegeGroup_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 4f438a9da11c2..f7c75da2fc33a 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -27,11 +27,13 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -109,8 +111,15 @@ func (job *LoadCollectionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) @@ -314,8 +323,15 @@ func (job *LoadPartitionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions") } + // handle legacy proxy load request + if len(req.GetLoadFields()) == 0 { + req.LoadFields = lo.FilterMap(req.GetSchema().GetFields(), func(field *schemapb.FieldSchema, _ int) (int64, bool) { + return field.GetFieldID(), field.GetFieldID() >= common.StartOfUserFieldID + }) + } + if !funcutil.SliceSetEqual(collection.GetLoadFields(), req.GetLoadFields()) { - log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + log.Warn("collection with different load field list exists, release this collection first before chaning its load fields", zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), zap.Int64s("reqFieldIDs", req.GetLoadFields()), ) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index e919d28b7a240..b3d0f58d62ed5 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/rgpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" @@ -218,7 +219,7 @@ func (suite *JobSuite) BeforeTest(suiteName, testName string) { for collection, partitions := range suite.partitions { suite.broker.EXPECT(). GetPartitions(mock.Anything, collection). - Return(partitions, nil) + Return(partitions, nil).Maybe() } } @@ -307,32 +308,6 @@ func (suite *JobSuite) TestLoadCollection() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load existed collection with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { - continue - } - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - LoadFields: []int64{100, 101}, - } - job := NewLoadCollectionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition while collection exists for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { @@ -450,6 +425,131 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { } } +func (suite *JobSuite) TestLoadCollectionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load collection + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + // Load with 1 replica + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_again_same_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{102, 101, 100}, // field id order shall not matter + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_again_diff_fields", func() { + // Test load existed collection with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_from_legacy_proxy", func() { + // Test load again with legacy proxy + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestLoadPartition() { ctx := context.Background() @@ -540,34 +640,6 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrParameterInvalid) } - // Test load partition with different load fields - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { - continue - } - - req := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - LoadFields: []int64{100, 101}, - } - job := NewLoadPartitionJob( - ctx, - req, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.collectionObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(job) - err := job.Wait() - suite.ErrorIs(err, merr.ErrParameterInvalid) - } - // Test load partition with more partition for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { @@ -682,6 +754,140 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrResourceGroupNodeNotEnough) } +func (suite *JobSuite) TestLoadPartitionWithLoadFields() { + ctx := context.Background() + + suite.Run("init_load", func() { + // Test load partition + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{100, 101, 102}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + suite.EqualValues(1, suite.meta.GetReplicaNumber(collection)) + suite.targetMgr.UpdateCollectionCurrentTarget(collection) + suite.assertCollectionLoaded(collection) + } + }) + + suite.Run("load_with_same_load_fields", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + LoadFields: []int64{102, 101, 100}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) + + suite.Run("load_with_diff_load_fields", func() { + // Test load partition with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + LoadFields: []int64{100, 101}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + }) + + suite.Run("load_legacy_proxy", func() { + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + // Load with 1 replica + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + ReplicaNumber: 1, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: 100}, + {FieldID: 101}, + {FieldID: 102}, + }, + }, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.NoError(err) + } + }) +} + func (suite *JobSuite) TestDynamicLoad() { ctx := context.Background()