diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index f7c1c5733cbd7..cdccf866a21c8 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" @@ -734,6 +735,10 @@ func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog { for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -745,6 +750,10 @@ func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog { for _, path := range paths { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } @@ -756,6 +765,10 @@ func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *da for _, id := range logIDs { l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogID: id, EntriesNum: entry}) } + err := binlog.CompressFieldBinlogs([]*datapb.FieldBinlog{l}) + if err != nil { + panic(err) + } return l } diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index d90b1b015018a..92c2350e6a782 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -679,8 +679,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Growing, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -690,7 +690,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), @@ -730,8 +730,8 @@ func TestUpdateSegmentsInfo(t *testing.T) { // normal segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ ID: 1, State: commonpb.SegmentState_Flushed, - Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, + Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, }} err = meta.AddSegment(context.TODO(), segment1) assert.NoError(t, err) @@ -829,9 +829,9 @@ func TestUpdateSegmentsInfo(t *testing.T) { err = meta.UpdateSegmentsInfo( UpdateStatusOperator(1, commonpb.SegmentState_Flushing), AddBinlogsOperator(1, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{getFieldBinlogIDs(1, 0)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1)}}}}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index feb3d0eabedef..ffdbc1cdcd283 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -321,14 +321,17 @@ func TestGetSegmentInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), + LogID: 801, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), + LogID: 802, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), + LogID: 803, }, }, }, @@ -1822,14 +1825,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1842,10 +1848,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1919,14 +1927,17 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1939,10 +1950,12 @@ func TestGetRecoveryInfo(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index ddb813acfa922..5454c08e93814 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1139,14 +1139,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), + LogID: 903, }, }, }, @@ -1159,10 +1162,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), + LogID: 802, }, }, }, @@ -1239,14 +1244,17 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), + LogID: 901, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), + LogID: 902, }, { EntriesNum: 20, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), + LogID: 903, }, }, }, @@ -1259,10 +1267,12 @@ func TestGetRecoveryInfoV2(t *testing.T) { { EntriesNum: 30, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), + LogID: 801, }, { EntriesNum: 70, LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), + LogID: 802, }, }, }, @@ -1309,9 +1319,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, { LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), + LogID: 801, }, }, }, @@ -1322,9 +1334,11 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, { LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), + LogID: 10000, }, }, }, @@ -1337,6 +1351,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { TimestampTo: 1, LogPath: metautil.BuildDeltaLogPath("a", 0, 100, 0, 100000), LogSize: 1, + LogID: 100000, }, }, },