Skip to content

Commit

Permalink
fix review comment
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Dec 25, 2024
1 parent c99cb48 commit c554a04
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,15 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64)
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

targetActions := make(map[int64][]Action, 0)
for _, t := range scheduler.segmentTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
targetActions := make(map[int64][]Action)
for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex
taskCollID := task.CollectionID()
if collectionID != -1 && collectionID != taskCollID {
continue
}
for _, action := range t.Actions() {
if action.Node() == nodeID || nodeID == -1 {
if _, ok := targetActions[t.CollectionID()]; !ok {
targetActions[t.CollectionID()] = make([]Action, 0)
}
targetActions[t.CollectionID()] = append(targetActions[t.CollectionID()], action)
}
actions := filterActions(task.Actions(), nodeID)
if len(actions) > 0 {
targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
}
}

Expand All @@ -509,24 +506,32 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64)
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

targetActions := make(map[int64][]Action, 0)
for _, t := range scheduler.channelTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
targetActions := make(map[int64][]Action)
for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex
taskCollID := task.CollectionID()
if collectionID != -1 && collectionID != taskCollID {
continue
}
for _, action := range t.Actions() {
if action.Node() == nodeID || nodeID == -1 {
if _, ok := targetActions[t.CollectionID()]; !ok {
targetActions[t.CollectionID()] = make([]Action, 0)
}
targetActions[t.CollectionID()] = append(targetActions[t.CollectionID()], action)
}
actions := filterActions(task.Actions(), nodeID)
if len(actions) > 0 {
targetActions[taskCollID] = append(targetActions[taskCollID], actions...)
}
}

return scheduler.calculateTaskDelta(targetActions)
}

// filter actions by nodeID
func filterActions(actions []Action, nodeID int64) []Action {
filtered := make([]Action, 0, len(actions))
for _, action := range actions {
if nodeID == -1 || action.Node() == nodeID {
filtered = append(filtered, action)
}
}
return filtered
}

func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Action) int {
sum := 0
for collectionID, actions := range targetActions {
Expand Down

0 comments on commit c554a04

Please sign in to comment.