diff --git a/internal/querycoordv2/job/job_load.go b/internal/querycoordv2/job/job_load.go index 4ade22ee48c73..f17d64ea8be28 100644 --- a/internal/querycoordv2/job/job_load.go +++ b/internal/querycoordv2/job/job_load.go @@ -19,6 +19,7 @@ package job import ( "context" "fmt" + "reflect" "time" "github.com/cockroachdb/errors" @@ -104,6 +105,14 @@ func (job *LoadCollectionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded collection") } + if !reflect.DeepEqual(collection.GetLoadFields(), req.GetLoadFields()) { + log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), + zap.Int64s("reqFieldIDs", req.GetLoadFields()), + ) + return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection") + } + return nil } @@ -289,6 +298,14 @@ func (job *LoadPartitionJob) PreExecute() error { return merr.WrapErrParameterInvalid(collection.GetReplicaNumber(), req.GetReplicaNumber(), "can't change the replica number for loaded partitions") } + if !reflect.DeepEqual(collection.GetLoadFields(), req.GetLoadFields()) { + log.Warn("collection with different load field list exists, release this collection first before chaning its replica number", + zap.Int64s("loadedFieldIDs", collection.GetLoadFields()), + zap.Int64s("reqFieldIDs", req.GetLoadFields()), + ) + return merr.WrapErrParameterInvalid(collection.GetLoadFields(), req.GetLoadFields(), "can't change the load field list for loaded collection") + } + return nil } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 276234990de1b..e919d28b7a240 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -307,6 +307,32 @@ func (suite *JobSuite) TestLoadCollection() { suite.ErrorIs(err, merr.ErrParameterInvalid) } + // Test load existed collection with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { + continue + } + req := &querypb.LoadCollectionRequest{ + CollectionID: collection, + LoadFields: []int64{100, 101}, + } + job := NewLoadCollectionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + // Test load partition while collection exists for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadCollection { @@ -514,6 +540,34 @@ func (suite *JobSuite) TestLoadPartition() { suite.ErrorIs(err, merr.ErrParameterInvalid) } + // Test load partition with different load fields + for _, collection := range suite.collections { + if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { + continue + } + + req := &querypb.LoadPartitionsRequest{ + CollectionID: collection, + PartitionIDs: suite.partitions[collection], + LoadFields: []int64{100, 101}, + } + job := NewLoadPartitionJob( + ctx, + req, + suite.dist, + suite.meta, + suite.broker, + suite.cluster, + suite.targetMgr, + suite.targetObserver, + suite.collectionObserver, + suite.nodeMgr, + ) + suite.scheduler.Add(job) + err := job.Wait() + suite.ErrorIs(err, merr.ErrParameterInvalid) + } + // Test load partition with more partition for _, collection := range suite.collections { if suite.loadTypes[collection] != querypb.LoadType_LoadPartition {