Skip to content

Commit

Permalink
enhance: add log for partition stats( milvus-io#30376) (milvus-io#35219)
Browse files Browse the repository at this point in the history
related:  milvus-io#30376

Signed-off-by: MrPresent-Han <[email protected]>
Co-authored-by: MrPresent-Han <[email protected]>
Signed-off-by: Sumit Dubey <[email protected]>
  • Loading branch information
2 people authored and sumitd2 committed Aug 6, 2024
1 parent f71f041 commit fb52aa0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
8 changes: 8 additions & 0 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica
psVersionInLView := partStatsInLView[partID]
if psVersionInLView < psVersionInTarget {
partStatsToUpdate[partID] = psVersionInTarget
} else {
log.RatedDebug(60, "no need to update part stats for partition",
zap.Int64("partitionID", partID),
zap.Int64("psVersionInLView", psVersionInLView),
zap.Int64("psVersionInTarget", psVersionInTarget))
}
}
if len(partStatsToUpdate) > 0 {
Expand All @@ -139,6 +144,9 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica
t.SetPriority(task.TaskPriorityLow)
t.SetReason("sync partition stats versions")
ret = append(ret, t)
log.Debug("Created leader actions for partitionStats",
zap.Int64("collectionID", leaderView.CollectionID),
zap.Any("action", action.String()))
}

return ret
Expand Down
18 changes: 13 additions & 5 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,26 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool
mgr.current.updateCollectionTarget(collectionID, newTarget)
mgr.next.removeCollectionTarget(collectionID)

log.Debug("finish to update current target for collection",
zap.Int64s("segments", newTarget.GetAllSegmentIDs()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()),
zap.Int64("version", newTarget.GetTargetVersion()),
)
partStatsVersionInfo := "partitionStats:"
for channelName, dmlChannel := range newTarget.dmChannels {
ts, _ := tsoutil.ParseTS(dmlChannel.GetSeekPosition().GetTimestamp())
metrics.QueryCoordCurrentTargetCheckpointUnixSeconds.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
channelName,
).Set(float64(ts.Unix()))
partStatsVersionInfo += fmt.Sprintf("%s:[", channelName)
partStatsVersion := dmlChannel.PartitionStatsVersions
for partID, statVersion := range partStatsVersion {
partStatsVersionInfo += fmt.Sprintf("%d:%d,", partID, statVersion)
}
partStatsVersionInfo += "],"
}
log.Debug("finish to update current target for collection",
zap.Int64s("segments", newTarget.GetAllSegmentIDs()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()),
zap.Int64("version", newTarget.GetTargetVersion()),
zap.String("partStatsVersion", partStatsVersionInfo),
)
return true
}

Expand Down
10 changes: 8 additions & 2 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,8 +796,14 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi
colID := sd.Collection()
log := log.Ctx(ctx)
for partID, newVersion := range partStatsVersions {
curStats, exist := sd.partitionStats[partID]
if exist && curStats.Version >= newVersion {
var curStats *storage.PartitionStatsSnapshot
var exist bool
func() {
sd.partitionStatsMut.RLock()
defer sd.partitionStatsMut.RUnlock()
curStats, exist = sd.partitionStats[partID]
}()
if exist && curStats != nil && curStats.Version >= newVersion {
log.RatedWarn(60, "Input partition stats' version is less or equal than current partition stats, skip",
zap.Int64("partID", partID),
zap.Int64("curVersion", curStats.Version),
Expand Down

0 comments on commit fb52aa0

Please sign in to comment.