Skip to content

Commit

Permalink
feat: support embedding bm25 sparse vector and flush bm25 stats log (m…
Browse files Browse the repository at this point in the history
…ilvus-io#36036)

relate: milvus-io#35853

---------

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Sep 19, 2024
1 parent c0317ce commit 1397873
Show file tree
Hide file tree
Showing 59 changed files with 1,718 additions and 379 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (t *l0CompactionTask) saveSegmentMeta() error {
result := t.result
var operators []UpdateOperator
for _, seg := range result.GetSegments() {
operators = append(operators, AddBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs()))
operators = append(operators, AddBinlogsOperator(seg.GetSegmentID(), nil, nil, seg.GetDeltalogs(), nil))
}

for _, segID := range t.InputSegments {
Expand Down
3 changes: 2 additions & 1 deletion internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func RevertSegmentPartitionStatsVersionOperator(segmentID int64) UpdateOperator
}

// Add binlogs in segmentInfo
func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb.FieldBinlog) UpdateOperator {
func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs, bm25logs []*datapb.FieldBinlog) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
Expand All @@ -875,6 +875,7 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb
segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs)
segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs)
segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs)
segment.Bm25Statslogs = mergeFieldBinlogs(segment.GetBm25Statslogs(), bm25logs)
modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
[]*datapb.FieldBinlog{},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down Expand Up @@ -735,7 +736,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.NoError(t, err)

err = meta.UpdateSegmentsInfo(
AddBinlogsOperator(1, nil, nil, nil),
AddBinlogsOperator(1, nil, nil, nil, nil),
)
assert.NoError(t, err)

Expand Down Expand Up @@ -816,6 +817,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
[]*datapb.FieldBinlog{},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath

// save binlogs, start positions and checkpoints
operators = append(operators,
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs()),
AddBinlogsOperator(req.GetSegmentID(), req.GetField2BinlogPaths(), req.GetField2StatslogPaths(), req.GetDeltalogs(), req.GetField2Bm25LogPaths()),
UpdateStartPosition(req.GetStartPositions()),
UpdateCheckPointOperator(req.GetSegmentID(), req.GetCheckPoints()),
)
Expand Down
47 changes: 47 additions & 0 deletions internal/datanode/compaction/load_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand All @@ -30,6 +31,52 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func LoadBM25Stats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) (map[int64]*storage.BM25Stats, error) {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", segmentID))
log.Info("begin to reload history BM25 stats", zap.Int("statsBinLogsLen", len(statsBinlogs)))

fieldList, fieldOffset := make([]int64, len(statsBinlogs)), make([]int, len(statsBinlogs))
logpaths := make([]string, 0)
for i, binlog := range statsBinlogs {
fieldList[i] = binlog.FieldID
fieldOffset[i] = len(binlog.Binlogs)
logpaths = append(logpaths, lo.Map(binlog.Binlogs, func(log *datapb.Binlog, _ int) string { return log.GetLogPath() })...)
}

if len(logpaths) == 0 {
log.Warn("no BM25 stats to load")
return nil, nil
}

values, err := chunkManager.MultiRead(ctx, logpaths)
if err != nil {
log.Warn("failed to load BM25 stats files", zap.Error(err))
return nil, err
}

result := make(map[int64]*storage.BM25Stats)
cnt := 0
for i, fieldID := range fieldList {
for offset := 0; offset < fieldOffset[i]; offset++ {
stats, ok := result[fieldID]
if !ok {
stats = storage.NewBM25Stats()
result[fieldID] = stats
}
err := stats.Deserialize(values[cnt+offset])
if err != nil {
return nil, err
}
}
cnt += fieldOffset[i]
}

// TODO ADD METRIC FOR LOAD BM25 TIME
log.Info("Successfully load BM25 stats", zap.Any("time", time.Since(startTs)))
return result, nil
}

func LoadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) {
startTs := time.Now()
log := log.With(zap.Int64("segmentID", segmentID))
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
PartitionID: PartitionID,
ID: 99999,
NumOfRows: 0,
}, pkoracle.NewBloomFilterSet())
}, pkoracle.NewBloomFilterSet(), nil)

s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(),
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewSyncTask(ctx context.Context,
}, func(info *datapb.SegmentInfo) pkoracle.PkStat {
bfs := pkoracle.NewBloomFilterSet()
return bfs
})
}, metacache.NewBM25StatsFactory)
}

var serializer syncmgr.Serializer
Expand Down Expand Up @@ -248,7 +248,7 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache {
}
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
metaCaches[channel] = metaCache
}
return metaCaches
Expand Down
40 changes: 20 additions & 20 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
PartitionID: 2,
State: commonpb.SegmentState_Growing,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) pkoracle.PkStat { return pkoracle.NewBloomFilterSet() })
}, func(_ *datapb.SegmentInfo) pkoracle.PkStat { return pkoracle.NewBloomFilterSet() }, metacache.NoneBm25StatsFactory)

s.Run("service_not_ready", func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -637,7 +637,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
Expand All @@ -648,7 +648,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
Expand All @@ -659,7 +659,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
CollectionID: 1,
Expand All @@ -670,7 +670,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 103,
CollectionID: 1,
Expand All @@ -681,7 +681,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down Expand Up @@ -759,7 +759,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
Expand All @@ -770,7 +770,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
Expand All @@ -781,7 +781,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down Expand Up @@ -847,7 +847,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
Expand All @@ -858,7 +858,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
Expand All @@ -869,7 +869,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down Expand Up @@ -935,7 +935,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
Expand All @@ -946,7 +946,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
Expand All @@ -957,7 +957,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
CollectionID: 1,
Expand All @@ -968,7 +968,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
Expand All @@ -1039,7 +1039,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
Expand All @@ -1050,7 +1050,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down Expand Up @@ -1110,7 +1110,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
}, metacache.NoneBm25StatsFactory)
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
Expand Down
6 changes: 6 additions & 0 deletions internal/flushcommon/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func RollStats(newStats ...*storage.PrimaryKeyStats) SegmentAction {
}
}

func MergeBm25Stats(newStats map[int64]*storage.BM25Stats) SegmentAction {
return func(info *SegmentInfo) {
info.bm25stats.Merge(newStats)
}
}

func StartSyncing(batchSize int64) SegmentAction {
return func(info *SegmentInfo) {
info.syncingRows += batchSize
Expand Down
Loading

0 comments on commit 1397873

Please sign in to comment.