From 04840f73921c16cd18189518d46a3c4de3ce9d40 Mon Sep 17 00:00:00 2001 From: aoiasd Date: Mon, 30 Dec 2024 16:54:00 +0800 Subject: [PATCH] fix bm25 import segment loss stats Signed-off-by: aoiasd --- internal/datacoord/import_scheduler.go | 2 +- internal/datacoord/meta.go | 3 ++- internal/datanode/importv2/task.go | 1 + internal/datanode/importv2/util.go | 3 ++- internal/flushcommon/syncmgr/task.go | 4 ++-- internal/proto/data_coord.proto | 1 + .../server/flusher/flusherimpl/channel_lifetime.go | 2 +- 7 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index e5b7585e2fb82..8e8f655ece1f2 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -320,7 +320,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { WrapTaskLog(task, zap.Int64("segmentID", info.GetSegmentID()), zap.Error(err))...) return } - op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs()) + op1 := UpdateBinlogsOperator(info.GetSegmentID(), info.GetBinlogs(), info.GetStatslogs(), info.GetDeltalogs(), info.GetBm25Logs()) op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2) if err != nil { diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index aab1f9a41e78e..f90c629941fb4 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -921,7 +921,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs } } -func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator { +func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) if segment == nil { @@ -933,6 +933,7 @@ func UpdateBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*dat segment.Binlogs = binlogs segment.Statslogs = statslogs segment.Deltalogs = deltalogs + segment.Bm25Statslogs = bm25logs modPack.increments[segmentID] = metastore.BinlogsIncrement{ Segment: segment.SegmentInfo, } diff --git a/internal/datanode/importv2/task.go b/internal/datanode/importv2/task.go index 0d7c46e6cc474..7bde8b0ac56d1 100644 --- a/internal/datanode/importv2/task.go +++ b/internal/datanode/importv2/task.go @@ -143,6 +143,7 @@ func UpdateSegmentInfo(info *datapb.ImportSegmentInfo) UpdateAction { segmentsInfo[segment].Binlogs = mergeFn(segmentsInfo[segment].Binlogs, info.GetBinlogs()) segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Statslogs, info.GetStatslogs()) segmentsInfo[segment].Deltalogs = mergeFn(segmentsInfo[segment].Deltalogs, info.GetDeltalogs()) + segmentsInfo[segment].Statslogs = mergeFn(segmentsInfo[segment].Bm25Logs, info.GetBm25Logs()) return } segmentsInfo[segment] = info diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index eb6c592f85b12..2f44e24bf7f93 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -105,7 +105,7 @@ func NewSyncTask(ctx context.Context, func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache.MetaCache) (*datapb.ImportSegmentInfo, error) { segmentID := syncTask.SegmentID() - insertBinlogs, statsBinlog, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs() + insertBinlogs, statsBinlog, bm25Log, deltaLog := syncTask.(*syncmgr.SyncTask).Binlogs() metaCache := metaCaches[syncTask.ChannelName()] segment, ok := metaCache.GetSegmentByID(segmentID) if !ok { @@ -120,6 +120,7 @@ func NewImportSegmentInfo(syncTask syncmgr.Task, metaCaches map[string]metacache ImportedRows: segment.FlushedRows(), Binlogs: lo.Values(insertBinlogs), Statslogs: lo.Values(statsBinlog), + Bm25Logs: lo.Values(bm25Log), Deltalogs: deltaLogs, }, nil } diff --git a/internal/flushcommon/syncmgr/task.go b/internal/flushcommon/syncmgr/task.go index e5c2f913cd0c6..d103d300900aa 100644 --- a/internal/flushcommon/syncmgr/task.go +++ b/internal/flushcommon/syncmgr/task.go @@ -414,8 +414,8 @@ func (t *SyncTask) IsFlush() bool { return t.isFlush } -func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { - return t.insertBinlogs, t.statsBinlogs, t.deltaBinlog +func (t *SyncTask) Binlogs() (map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, map[int64]*datapb.FieldBinlog, *datapb.FieldBinlog) { + return t.insertBinlogs, t.statsBinlogs, t.bm25Binlogs, t.deltaBinlog } func (t *SyncTask) MarshalJSON() ([]byte, error) { diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 5d78f28cf0ddf..c7ca6ab8ce9c3 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -850,6 +850,7 @@ message ImportSegmentInfo { repeated FieldBinlog binlogs = 3; repeated FieldBinlog statslogs = 4; repeated FieldBinlog deltalogs = 5; + repeated FieldBinlog bm25logs = 6; } message QueryImportResponse { diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index 51965267f56fb..cfe69d68d4e28 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -131,7 +131,7 @@ func (c *channelLifetime) Run() error { return } if tt, ok := t.(*syncmgr.SyncTask); ok { - insertLogs, _, _ := tt.Binlogs() + insertLogs, _, _, _ := tt.Binlogs() resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{ BinLogCounterIncr: 1, BinLogFileCounterIncr: uint64(len(insertLogs)),