Skip to content

Commit

Permalink
fix: stats log lost after disable stats log loading on flush (milvus-…
Browse files Browse the repository at this point in the history
…io#36592)

issue: milvus-io#36555

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Sep 29, 2024
1 parent a6545b2 commit a47abb2
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (dsService *DataSyncService) GetMetaCache() metacache.MetaCache {
return dsService.metacache
}

func getMetaCacheForStreaming(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) {
return initMetaCache(initCtx, params.ChunkManager, info, nil, unflushed, flushed)
}

func getMetaCacheWithTickler(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) {
tickler.SetTotal(int32(len(unflushed) + len(flushed)))
return initMetaCache(initCtx, params.ChunkManager, info, tickler, unflushed, flushed)
Expand Down Expand Up @@ -161,7 +165,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
return nil, err
}
segmentPks.Insert(segment.GetID(), pkoracle.NewBloomFilterSet(stats...))
if !streamingutil.IsStreamingServiceEnabled() {
if tickler != nil {
tickler.Inc()
}

Expand All @@ -180,8 +184,11 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
}
}

// growing segments's stats should always be loaded, for generating merged pk bf.
loadSegmentStats("growing", unflushed)
loadSegmentStats("sealed", flushed)
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
loadSegmentStats("sealed", flushed)
}

// use fetched segment info
info.Vchan.FlushedSegments = flushed
Expand Down Expand Up @@ -344,29 +351,18 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
}
}

if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() {
// In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter.
// So, here we skip loading the bloom filter for flushed segments.
info.Vchan.FlushedSegments = flushedSegmentInfos
info.Vchan.UnflushedSegments = unflushedSegmentInfos
metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
}, metacache.NoneBm25StatsFactory)
} else {
// init metaCache meta
metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos)
if err != nil {
return nil, err
}
// init metaCache meta
if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}

return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
}

func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, input <-chan *msgstream.MsgPack) (*DataSyncService, error) {
// recover segment checkpoints
var (
err error
metaCache metacache.MetaCache
unflushedSegmentInfos []*datapb.SegmentInfo
flushedSegmentInfos []*datapb.SegmentInfo
)
Expand All @@ -383,13 +379,10 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
}
}

// In streaming service mode, flushed segments no longer maintain a bloom filter.
// So, here we skip loading the bloom filter for flushed segments.
info.Vchan.UnflushedSegments = unflushedSegmentInfos
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
}, metacache.NoneBm25StatsFactory)

// init metaCache meta
if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
}

Expand Down

0 comments on commit a47abb2

Please sign in to comment.