diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 0eb0d6dd1b5a6..77fdda14dc859 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -121,6 +121,11 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica psVersionInLView := partStatsInLView[partID] if psVersionInLView < psVersionInTarget { partStatsToUpdate[partID] = psVersionInTarget + } else { + log.RatedDebug(60, "no need to update part stats for partition", + zap.Int64("partitionID", partID), + zap.Int64("psVersionInLView", psVersionInLView), + zap.Int64("psVersionInTarget", psVersionInTarget)) } } if len(partStatsToUpdate) > 0 { @@ -139,6 +144,9 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica t.SetPriority(task.TaskPriorityLow) t.SetReason("sync partition stats versions") ret = append(ret, t) + log.Debug("Created leader actions for partitionStats", + zap.Int64("collectionID", leaderView.CollectionID), + zap.Any("action", action.String())) } return ret diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 6d1eb062e4174..f6851f2326511 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -113,18 +113,26 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool mgr.current.updateCollectionTarget(collectionID, newTarget) mgr.next.removeCollectionTarget(collectionID) - log.Debug("finish to update current target for collection", - zap.Int64s("segments", newTarget.GetAllSegmentIDs()), - zap.Strings("channels", newTarget.GetAllDmChannelNames()), - zap.Int64("version", newTarget.GetTargetVersion()), - ) + partStatsVersionInfo := "partitionStats:" for channelName, dmlChannel := range newTarget.dmChannels { ts, _ := tsoutil.ParseTS(dmlChannel.GetSeekPosition().GetTimestamp()) metrics.QueryCoordCurrentTargetCheckpointUnixSeconds.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), channelName, ).Set(float64(ts.Unix())) + partStatsVersionInfo += fmt.Sprintf("%s:[", channelName) + partStatsVersion := dmlChannel.PartitionStatsVersions + for partID, statVersion := range partStatsVersion { + partStatsVersionInfo += fmt.Sprintf("%d:%d,", partID, statVersion) + } + partStatsVersionInfo += "]," } + log.Debug("finish to update current target for collection", + zap.Int64s("segments", newTarget.GetAllSegmentIDs()), + zap.Strings("channels", newTarget.GetAllDmChannelNames()), + zap.Int64("version", newTarget.GetTargetVersion()), + zap.String("partStatsVersion", partStatsVersionInfo), + ) return true } diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index f0ee9dec318c6..1d11b4c5add68 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -796,8 +796,14 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi colID := sd.Collection() log := log.Ctx(ctx) for partID, newVersion := range partStatsVersions { - curStats, exist := sd.partitionStats[partID] - if exist && curStats.Version >= newVersion { + var curStats *storage.PartitionStatsSnapshot + var exist bool + func() { + sd.partitionStatsMut.RLock() + defer sd.partitionStatsMut.RUnlock() + curStats, exist = sd.partitionStats[partID] + }() + if exist && curStats != nil && curStats.Version >= newVersion { log.RatedWarn(60, "Input partition stats' version is less or equal than current partition stats, skip", zap.Int64("partID", partID), zap.Int64("curVersion", curStats.Version),