Skip to content

Commit

Permalink
enhance: Make dynamic load/release partition follow targets (milvus-i…
Browse files Browse the repository at this point in the history
…o#38059)

Related to milvus-io#37849

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 5, 2024
1 parent 32645fc commit 051bc28
Show file tree
Hide file tree
Showing 26 changed files with 399 additions and 539 deletions.
22 changes: 2 additions & 20 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type LoadCollectionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
Expand All @@ -63,7 +62,6 @@ func NewLoadCollectionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
Expand All @@ -72,11 +70,10 @@ func NewLoadCollectionJob(
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
Expand Down Expand Up @@ -193,12 +190,6 @@ func (job *LoadCollectionJob) Execute() error {
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}

// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{
Expand Down Expand Up @@ -264,7 +255,6 @@ type LoadPartitionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
Expand All @@ -277,7 +267,6 @@ func NewLoadPartitionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
Expand All @@ -286,11 +275,10 @@ func NewLoadPartitionJob(
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
Expand Down Expand Up @@ -399,12 +387,6 @@ func (job *LoadPartitionJob) Execute() error {
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}

// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{
Expand Down
8 changes: 2 additions & 6 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
Expand All @@ -65,7 +64,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
Expand All @@ -86,7 +84,6 @@ func (job *ReleaseCollectionJob) Execute() error {
toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 {

Check failure on line 84 in internal/querycoordv2/job/job_release.go

View workflow job for this annotation

GitHub Actions / Code Checker MacOS 13

toRelease declared and not used
return partition.GetPartitionID()
})
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)

err := job.meta.CollectionManager.RemoveCollection(job.ctx, req.GetCollectionID())
if err != nil {
Expand Down Expand Up @@ -137,7 +134,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
Expand All @@ -149,7 +145,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
Expand Down Expand Up @@ -178,7 +173,6 @@ func (job *ReleasePartitionJob) Execute() error {
log.Warn("releasing partition(s) not loaded")
return nil
}
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)

// If all partitions are released, clear all
if len(toRelease) == len(loadedPartitions) {
Expand Down Expand Up @@ -211,6 +205,8 @@ func (job *ReleasePartitionJob) Execute() error {
return errors.Wrap(err, msg)
}
job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...)
// wait current target updated, so following querys will act as expected
waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
}
return nil
Expand Down
34 changes: 17 additions & 17 deletions internal/querycoordv2/job/job_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,36 @@ import (

"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
)

type SyncNewCreatedPartitionJob struct {
*BaseJob
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
targetObserver *observers.TargetObserver
targetMgr meta.TargetManagerInterface
}

func NewSyncNewCreatedPartitionJob(
ctx context.Context,
req *querypb.SyncNewCreatedPartitionRequest,
meta *meta.Meta,
cluster session.Cluster,
broker meta.Broker,
targetObserver *observers.TargetObserver,
targetMgr meta.TargetManagerInterface,
) *SyncNewCreatedPartitionJob {
return &SyncNewCreatedPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
cluster: cluster,
broker: broker,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
broker: broker,
targetObserver: targetObserver,
targetMgr: targetMgr,
}
}

Expand All @@ -75,11 +80,6 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
return nil
}

err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID())
if err != nil {
return err
}

partition := &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: req.GetCollectionID(),
Expand All @@ -89,12 +89,12 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
LoadPercentage: 100,
CreatedAt: time.Now(),
}
err = job.meta.CollectionManager.PutPartition(job.ctx, partition)
err := job.meta.CollectionManager.PutPartition(job.ctx, partition)
if err != nil {
msg := "failed to store partitions"
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}

return nil
return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
}
Loading

0 comments on commit 051bc28

Please sign in to comment.