diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 54823a988fa44..c0229c4065b1f 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,13 +17,14 @@ package task import ( - "reflect" + "fmt" "github.com/samber/lo" "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -50,6 +51,7 @@ type Action interface { Node() int64 Type() ActionType IsFinished(distMgr *meta.DistributionManager) bool + String() string } type BaseAction struct { @@ -78,6 +80,10 @@ func (action *BaseAction) Shard() string { return action.shard } +func (action *BaseAction) String() string { + return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard()) +} + type SegmentAction struct { *BaseAction @@ -153,6 +159,10 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool return true } +func (action *SegmentAction) String() string { + return action.BaseAction.String() + fmt.Sprintf(`{[segmentID=%d][scope=%d]}`, action.SegmentID(), action.Scope()) +} + type ChannelAction struct { *BaseAction } @@ -218,6 +228,19 @@ func (action *LeaderAction) Version() typeutil.UniqueID { return action.version } +func (action *LeaderAction) PartStats() map[int64]int64 { + return action.partStatsVersions +} + +func (action *LeaderAction) String() string { + partStatsStr := "" + if action.PartStats() != nil { + partStatsStr = fmt.Sprintf("%v", action.PartStats()) + } + return action.BaseAction.String() + fmt.Sprintf(`{[leaderID=%v][segmentID=%d][version=%d][partStats=%s]}`, + action.GetLeaderID(), action.SegmentID(), action.Version(), partStatsStr) +} + func (action *LeaderAction) GetLeaderID() typeutil.UniqueID { return action.leaderID } @@ -237,7 +260,7 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { case ActionTypeReduce: return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node()) case ActionTypeUpdate: - return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions)) + return action.rpcReturned.Load() && common.MapEquals(action.partStatsVersions, view.PartitionStatsVersions) } return false } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index d3f6d205aa998..ed7431a539b7b 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -280,15 +280,8 @@ func (task *baseTask) SetReason(reason string) { func (task *baseTask) String() string { var actionsStr string - for i, action := range task.actions { - if realAction, ok := action.(*SegmentAction); ok { - actionsStr += fmt.Sprintf(`{[type=%v][node=%d][streaming=%v]}`, action.Type(), action.Node(), realAction.Scope() == querypb.DataScope_Streaming) - } else { - actionsStr += fmt.Sprintf(`{[type=%v][node=%d]}`, action.Type(), action.Node()) - } - if i != len(task.actions)-1 { - actionsStr += ", " - } + for _, action := range task.actions { + actionsStr += action.String() + "," } return fmt.Sprintf( "[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [resourceGroup=%s] [priority=%s] [actionsCount=%d] [actions=%s]", diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 64cb5edf35687..8db4345f7b595 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -825,7 +825,8 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi defer sd.partitionStatsMut.Unlock() sd.partitionStats[partID] = partStats }() - log.Info("Updated partitionStats for partition", zap.Int64("partitionID", partID)) + log.Info("Updated partitionStats for partition", zap.Int64("collectionID", sd.collectionID), zap.Int64("partitionID", partID), + zap.Int64("newVersion", newVersion), zap.Int64("oldVersion", curStats.GetVersion())) } } diff --git a/internal/querynodev2/delegator/segment_pruner.go b/internal/querynodev2/delegator/segment_pruner.go index a8155ce7c7c88..a1ea6129cb3c9 100644 --- a/internal/querynodev2/delegator/segment_pruner.go +++ b/internal/querynodev2/delegator/segment_pruner.go @@ -85,12 +85,12 @@ func PruneSegments(ctx context.Context, plan := planpb.PlanNode{} err := proto.Unmarshal(expr, &plan) if err != nil { - log.Error("failed to unmarshall serialized expr from bytes, failed the operation") + log.Ctx(ctx).Error("failed to unmarshall serialized expr from bytes, failed the operation") return } expr, err := exprutil.ParseExprFromPlan(&plan) if err != nil { - log.Error("failed to parse expr from plan, failed the operation") + log.Ctx(ctx).Error("failed to parse expr from plan, failed the operation") return } targetRanges, matchALL := exprutil.ParseRanges(expr, exprutil.ClusteringKey) @@ -123,7 +123,8 @@ func PruneSegments(ctx context.Context, metrics.QueryNodeSegmentPruneRatio. WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))). Observe(float64(realFilteredSegments / totalSegNum)) - log.Debug("Pruned segment for search/query", + log.Ctx(ctx).Debug("Pruned segment for search/query", + zap.Int("filtered_segment_num[stats]", len(filteredSegments)), zap.Int("filtered_segment_num[excluded]", realFilteredSegments), zap.Int("total_segment_num", totalSegNum), zap.Float32("filtered_ratio", float32(realFilteredSegments)/float32(totalSegNum)), diff --git a/internal/storage/partition_stats.go b/internal/storage/partition_stats.go index 7a49953eb9183..d7e3893e8097e 100644 --- a/internal/storage/partition_stats.go +++ b/internal/storage/partition_stats.go @@ -46,6 +46,9 @@ func NewPartitionStatsSnapshot() *PartitionStatsSnapshot { } func (ps *PartitionStatsSnapshot) GetVersion() int64 { + if ps == nil { + return 0 + } return ps.Version } diff --git a/pkg/common/map.go b/pkg/common/map.go index e5c9d21621332..4c9def2aa4ff5 100644 --- a/pkg/common/map.go +++ b/pkg/common/map.go @@ -22,3 +22,16 @@ func (m Str2Str) Equal(other Str2Str) bool { func CloneStr2Str(m Str2Str) Str2Str { return m.Clone() } + +func MapEquals(m1, m2 map[int64]int64) bool { + if len(m1) != len(m2) { + return false + } + for k1, v1 := range m1 { + v2, exist := m2[k1] + if !exist || v1 != v2 { + return false + } + } + return true +} diff --git a/pkg/common/map_test.go b/pkg/common/map_test.go index 2703609f653ac..e84065d4741f6 100644 --- a/pkg/common/map_test.go +++ b/pkg/common/map_test.go @@ -35,3 +35,25 @@ func TestCloneStr2Str(t *testing.T) { }) } } + +func TestMapEqual(t *testing.T) { + { + m1 := map[int64]int64{1: 11, 2: 22, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22, 3: 33} + assert.True(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22, 3: 33} + assert.False(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22} + assert.False(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + assert.False(t, MapEquals(m1, nil)) + } +}