From c61fb1eff559158bdb4e10f5a4949274aff20c08 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Fri, 7 Jun 2024 10:19:51 +0800 Subject: [PATCH] enhance: do check when add not empty logpath (#33640) meta only store logid Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/datacoord/meta_test.go | 4 +- internal/datacoord/segment_info.go | 2 +- internal/datacoord/segment_manager_test.go | 4 -- internal/datacoord/server_test.go | 16 +------ internal/datacoord/services_test.go | 22 ++------- .../metastore/kv/datacoord/kv_catalog_test.go | 47 +++++++++++++++++++ internal/metastore/kv/datacoord/util.go | 3 ++ 7 files changed, 58 insertions(+), 40 deletions(-) diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 4924a4c8e7b8f..1195cd6f4b26f 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -696,7 +696,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), @@ -837,7 +837,7 @@ func TestUpdateSegmentsInfo(t *testing.T) { AddBinlogsOperator(1, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)}, - []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}}, + []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}}, ), UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}), UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}), diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 5f317fa70f58e..287976d9b2fb0 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -183,7 +183,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { } // SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists -// set the logPath of segement in meta empty, to save space +// set the logPath of segment in meta empty, to save space // if segment has logPath, make it empty func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { if segment, ok := s.segments[segmentID]; ok { diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 69cb83d12f3b0..f869c719ea6f4 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -35,7 +35,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -649,7 +648,6 @@ func TestTryToSealSegment(t *testing.T) { { EntriesNum: 10, LogID: 3, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3), }, }, }, @@ -674,12 +672,10 @@ func TestTryToSealSegment(t *testing.T) { { EntriesNum: 10, LogID: 1, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3), }, { EntriesNum: 20, LogID: 2, - LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2), }, }, }, diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 0d5309714a65b..e8cbe8e8d1612 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -58,7 +58,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" @@ -320,17 +319,14 @@ func TestGetSegmentInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801), LogID: 801, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802), LogID: 802, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803), LogID: 803, }, }, @@ -344,10 +340,10 @@ func TestGetSegmentInfo(t *testing.T) { SegmentIDs: []int64{0}, } resp, err := svr.GetSegmentInfo(svr.ctx, req) + assert.NoError(t, err) assert.Equal(t, 1, len(resp.GetInfos())) // Check that # of rows is corrected from 100 to 60. assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows()) - assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) }) t.Run("with wrong segmentID", func(t *testing.T) { @@ -1824,17 +1820,14 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), LogID: 903, }, }, @@ -1847,12 +1840,10 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), LogID: 802, }, }, @@ -1926,17 +1917,14 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), LogID: 903, }, }, @@ -1949,12 +1937,10 @@ func TestGetRecoveryInfo(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), LogID: 802, }, }, diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 5454c08e93814..3ecbd263242aa 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1138,17 +1138,14 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903), LogID: 903, }, }, @@ -1161,12 +1158,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802), LogID: 802, }, }, @@ -1243,17 +1238,14 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901), LogID: 901, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902), LogID: 902, }, { EntriesNum: 20, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903), LogID: 903, }, }, @@ -1266,12 +1258,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { Binlogs: []*datapb.Binlog{ { EntriesNum: 30, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801), LogID: 801, }, { EntriesNum: 70, - LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802), LogID: 802, }, }, @@ -1318,12 +1308,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), - LogID: 801, + LogID: 801, }, { - LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801), - LogID: 801, + LogID: 801, }, }, }, @@ -1333,12 +1321,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { FieldID: 1, Binlogs: []*datapb.Binlog{ { - LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), - LogID: 10000, + LogID: 10000, }, { - LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000), - LogID: 10000, + LogID: 10000, }, }, }, diff --git a/internal/metastore/kv/datacoord/kv_catalog_test.go b/internal/metastore/kv/datacoord/kv_catalog_test.go index 8ff58679be1e2..ac850546a1f7f 100644 --- a/internal/metastore/kv/datacoord/kv_catalog_test.go +++ b/internal/metastore/kv/datacoord/kv_catalog_test.go @@ -313,6 +313,53 @@ func Test_AddSegments(t *testing.T) { assert.Equal(t, 4, len(savedKvs)) verifySavedKvsForSegment(t, savedKvs) }) + + t.Run("no need to store log path", func(t *testing.T) { + metakv := mocks.NewMetaKv(t) + catalog := NewCatalog(metakv, rootPath, "") + + validFieldBinlog := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + LogPath: "", + }, + }, + }} + + invalidFieldBinlog := []*datapb.FieldBinlog{{ + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + LogPath: "no need to store", + }, + }, + }} + + segment := &datapb.SegmentInfo{ + ID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + NumOfRows: 100, + State: commonpb.SegmentState_Flushed, + } + + segment.Statslogs = invalidFieldBinlog + err := catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + segment.Statslogs = validFieldBinlog + + segment.Binlogs = invalidFieldBinlog + err = catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + segment.Binlogs = validFieldBinlog + + segment.Deltalogs = invalidFieldBinlog + err = catalog.AddSegment(context.TODO(), segment) + assert.Error(t, err) + }) } func Test_AlterSegments(t *testing.T) { diff --git a/internal/metastore/kv/datacoord/util.go b/internal/metastore/kv/datacoord/util.go index 2d9292950e6a5..995631ccfa2a5 100644 --- a/internal/metastore/kv/datacoord/util.go +++ b/internal/metastore/kv/datacoord/util.go @@ -169,6 +169,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl if binlog.GetLogID() == 0 { return fmt.Errorf("invalid log id, binlog:%v", binlog) } + if binlog.GetLogPath() != "" { + return fmt.Errorf("fieldBinlog no need to store logpath, binlog:%v", binlog) + } } return nil }