Skip to content

Commit

Permalink
fix: sync part stats task cannot be finished(milvus-io#30376) (milvus…
Browse files Browse the repository at this point in the history
…-io#34027)

related: milvus-io#30376
also: refine log output for query_coord task by rephrasing action string

Signed-off-by: MrPresent-Han <[email protected]>
Co-authored-by: MrPresent-Han <[email protected]>
Co-authored-by: wayblink <[email protected]>
  • Loading branch information
3 people committed Jul 2, 2024
1 parent 8b13589 commit 8253575
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 15 deletions.
27 changes: 25 additions & 2 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package task

import (
"reflect"
"fmt"

"github.com/samber/lo"
"go.uber.org/atomic"

"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -50,6 +51,7 @@ type Action interface {
Node() int64
Type() ActionType
IsFinished(distMgr *meta.DistributionManager) bool
String() string
}

type BaseAction struct {
Expand Down Expand Up @@ -78,6 +80,10 @@ func (action *BaseAction) Shard() string {
return action.shard
}

func (action *BaseAction) String() string {
return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard())
}

type SegmentAction struct {
*BaseAction

Expand Down Expand Up @@ -153,6 +159,10 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
return true
}

func (action *SegmentAction) String() string {
return action.BaseAction.String() + fmt.Sprintf(`{[segmentID=%d][scope=%d]}`, action.SegmentID(), action.Scope())
}

type ChannelAction struct {
*BaseAction
}
Expand Down Expand Up @@ -218,6 +228,19 @@ func (action *LeaderAction) Version() typeutil.UniqueID {
return action.version
}

func (action *LeaderAction) PartStats() map[int64]int64 {
return action.partStatsVersions
}

func (action *LeaderAction) String() string {
partStatsStr := ""
if action.PartStats() != nil {
partStatsStr = fmt.Sprintf("%v", action.PartStats())
}
return action.BaseAction.String() + fmt.Sprintf(`{[leaderID=%v][segmentID=%d][version=%d][partStats=%s]}`,
action.GetLeaderID(), action.SegmentID(), action.Version(), partStatsStr)
}

func (action *LeaderAction) GetLeaderID() typeutil.UniqueID {
return action.leaderID
}
Expand All @@ -237,7 +260,7 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
case ActionTypeReduce:
return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node())
case ActionTypeUpdate:
return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions))
return action.rpcReturned.Load() && common.MapEquals(action.partStatsVersions, view.PartitionStatsVersions)
}
return false
}
11 changes: 2 additions & 9 deletions internal/querycoordv2/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,8 @@ func (task *baseTask) SetReason(reason string) {

func (task *baseTask) String() string {
var actionsStr string
for i, action := range task.actions {
if realAction, ok := action.(*SegmentAction); ok {
actionsStr += fmt.Sprintf(`{[type=%v][node=%d][streaming=%v]}`, action.Type(), action.Node(), realAction.Scope() == querypb.DataScope_Streaming)
} else {
actionsStr += fmt.Sprintf(`{[type=%v][node=%d]}`, action.Type(), action.Node())
}
if i != len(task.actions)-1 {
actionsStr += ", "
}
for _, action := range task.actions {
actionsStr += action.String() + ","
}
return fmt.Sprintf(
"[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [resourceGroup=%s] [priority=%s] [actionsCount=%d] [actions=%s]",
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi
defer sd.partitionStatsMut.Unlock()
sd.partitionStats[partID] = partStats
}()
log.Info("Updated partitionStats for partition", zap.Int64("partitionID", partID))
log.Info("Updated partitionStats for partition", zap.Int64("collectionID", sd.collectionID), zap.Int64("partitionID", partID),
zap.Int64("newVersion", newVersion), zap.Int64("oldVersion", curStats.GetVersion()))
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/querynodev2/delegator/segment_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func PruneSegments(ctx context.Context,
plan := planpb.PlanNode{}
err := proto.Unmarshal(expr, &plan)
if err != nil {
log.Error("failed to unmarshall serialized expr from bytes, failed the operation")
log.Ctx(ctx).Error("failed to unmarshall serialized expr from bytes, failed the operation")
return
}
expr, err := exprutil.ParseExprFromPlan(&plan)
if err != nil {
log.Error("failed to parse expr from plan, failed the operation")
log.Ctx(ctx).Error("failed to parse expr from plan, failed the operation")
return
}
targetRanges, matchALL := exprutil.ParseRanges(expr, exprutil.ClusteringKey)
Expand Down Expand Up @@ -123,7 +123,8 @@ func PruneSegments(ctx context.Context,
metrics.QueryNodeSegmentPruneRatio.
WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))).
Observe(float64(realFilteredSegments / totalSegNum))
log.Debug("Pruned segment for search/query",
log.Ctx(ctx).Debug("Pruned segment for search/query",
zap.Int("filtered_segment_num[stats]", len(filteredSegments)),
zap.Int("filtered_segment_num[excluded]", realFilteredSegments),
zap.Int("total_segment_num", totalSegNum),
zap.Float32("filtered_ratio", float32(realFilteredSegments)/float32(totalSegNum)),
Expand Down
3 changes: 3 additions & 0 deletions internal/storage/partition_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func NewPartitionStatsSnapshot() *PartitionStatsSnapshot {
}

func (ps *PartitionStatsSnapshot) GetVersion() int64 {
if ps == nil {
return 0
}
return ps.Version
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/common/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,16 @@ func (m Str2Str) Equal(other Str2Str) bool {
func CloneStr2Str(m Str2Str) Str2Str {
return m.Clone()
}

func MapEquals(m1, m2 map[int64]int64) bool {
if len(m1) != len(m2) {
return false
}
for k1, v1 := range m1 {
v2, exist := m2[k1]
if !exist || v1 != v2 {
return false
}
}
return true
}
22 changes: 22 additions & 0 deletions pkg/common/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,25 @@ func TestCloneStr2Str(t *testing.T) {
})
}
}

func TestMapEqual(t *testing.T) {
{
m1 := map[int64]int64{1: 11, 2: 22, 3: 33}
m2 := map[int64]int64{1: 11, 2: 22, 3: 33}
assert.True(t, MapEquals(m1, m2))
}
{
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
m2 := map[int64]int64{1: 11, 2: 22, 3: 33}
assert.False(t, MapEquals(m1, m2))
}
{
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
m2 := map[int64]int64{1: 11, 2: 22}
assert.False(t, MapEquals(m1, m2))
}
{
m1 := map[int64]int64{1: 11, 2: 23, 3: 33}
assert.False(t, MapEquals(m1, nil))
}
}

0 comments on commit 8253575

Please sign in to comment.