From a47abb2f2be49f195500ec3c7da94b0053516a8d Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Sun, 29 Sep 2024 16:53:15 +0800 Subject: [PATCH] fix: stats log lost after disable stats log loading on flush (#36592) issue: #36555 Signed-off-by: chyezh --- .../flushcommon/pipeline/data_sync_service.go | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/internal/flushcommon/pipeline/data_sync_service.go b/internal/flushcommon/pipeline/data_sync_service.go index 649345e0f92ed..d46ef69f97e57 100644 --- a/internal/flushcommon/pipeline/data_sync_service.go +++ b/internal/flushcommon/pipeline/data_sync_service.go @@ -132,6 +132,10 @@ func (dsService *DataSyncService) GetMetaCache() metacache.MetaCache { return dsService.metacache } +func getMetaCacheForStreaming(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) { + return initMetaCache(initCtx, params.ChunkManager, info, nil, unflushed, flushed) +} + func getMetaCacheWithTickler(initCtx context.Context, params *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler, unflushed, flushed []*datapb.SegmentInfo) (metacache.MetaCache, error) { tickler.SetTotal(int32(len(unflushed) + len(flushed))) return initMetaCache(initCtx, params.ChunkManager, info, tickler, unflushed, flushed) @@ -161,7 +165,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i return nil, err } segmentPks.Insert(segment.GetID(), pkoracle.NewBloomFilterSet(stats...)) - if !streamingutil.IsStreamingServiceEnabled() { + if tickler != nil { tickler.Inc() } @@ -180,8 +184,11 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i } } + // growing segments's stats should always be loaded, for generating merged pk bf. loadSegmentStats("growing", unflushed) - loadSegmentStats("sealed", flushed) + if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { + loadSegmentStats("sealed", flushed) + } // use fetched segment info info.Vchan.FlushedSegments = flushed @@ -344,22 +351,10 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa } } - if paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool() { - // In SkipBFStatsLoad mode, flushed segments no longer maintain a bloom filter. - // So, here we skip loading the bloom filter for flushed segments. - info.Vchan.FlushedSegments = flushedSegmentInfos - info.Vchan.UnflushedSegments = unflushedSegmentInfos - metaCache = metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { - return pkoracle.NewBloomFilterSet() - }, metacache.NoneBm25StatsFactory) - } else { - // init metaCache meta - metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos) - if err != nil { - return nil, err - } + // init metaCache meta + if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil { + return nil, err } - return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil) } @@ -367,6 +362,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut // recover segment checkpoints var ( err error + metaCache metacache.MetaCache unflushedSegmentInfos []*datapb.SegmentInfo flushedSegmentInfos []*datapb.SegmentInfo ) @@ -383,13 +379,10 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut } } - // In streaming service mode, flushed segments no longer maintain a bloom filter. - // So, here we skip loading the bloom filter for flushed segments. - info.Vchan.UnflushedSegments = unflushedSegmentInfos - metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat { - return pkoracle.NewBloomFilterSet() - }, metacache.NoneBm25StatsFactory) - + // init metaCache meta + if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil { + return nil, err + } return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input) }