Skip to content

Commit

Permalink
enhance: do check when add not empty logpath (milvus-io#33640)
Browse files Browse the repository at this point in the history
meta only store logid

Signed-off-by: lixinguo <[email protected]>
Co-authored-by: lixinguo <[email protected]>
  • Loading branch information
smellthemoon and lixinguo authored Jun 7, 2024
1 parent ecee7d9 commit c61fb1e
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 40 deletions.
4 changes: 2 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down Expand Up @@ -837,7 +837,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
}

// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
// set the logPath of segement in meta empty, to save space
// set the logPath of segment in meta empty, to save space
// if segment has logPath, make it empty
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok {
Expand Down
4 changes: 0 additions & 4 deletions internal/datacoord/segment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -649,7 +648,6 @@ func TestTryToSealSegment(t *testing.T) {
{
EntriesNum: 10,
LogID: 3,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3),
},
},
},
Expand All @@ -674,12 +672,10 @@ func TestTryToSealSegment(t *testing.T) {
{
EntriesNum: 10,
LogID: 1,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3),
},
{
EntriesNum: 20,
LogID: 2,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2),
},
},
},
Expand Down
16 changes: 1 addition & 15 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
Expand Down Expand Up @@ -320,17 +319,14 @@ func TestGetSegmentInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
LogID: 801,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
LogID: 802,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
LogID: 803,
},
},
Expand All @@ -344,10 +340,10 @@ func TestGetSegmentInfo(t *testing.T) {
SegmentIDs: []int64{0},
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.NoError(t, err)
assert.Equal(t, 1, len(resp.GetInfos()))
// Check that # of rows is corrected from 100 to 60.
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("with wrong segmentID", func(t *testing.T) {
Expand Down Expand Up @@ -1824,17 +1820,14 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
Expand All @@ -1847,12 +1840,10 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
Expand Down Expand Up @@ -1926,17 +1917,14 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
Expand All @@ -1949,12 +1937,10 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},
Expand Down
22 changes: 4 additions & 18 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,17 +1138,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
Expand All @@ -1161,12 +1158,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
Expand Down Expand Up @@ -1243,17 +1238,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
Expand All @@ -1266,12 +1258,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},
Expand Down Expand Up @@ -1318,12 +1308,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
LogID: 801,
},
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
LogID: 801,
},
},
},
Expand All @@ -1333,12 +1321,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
LogID: 10000,
},
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
LogID: 10000,
},
},
},
Expand Down
47 changes: 47 additions & 0 deletions internal/metastore/kv/datacoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,53 @@ func Test_AddSegments(t *testing.T) {
assert.Equal(t, 4, len(savedKvs))
verifySavedKvsForSegment(t, savedKvs)
})

t.Run("no need to store log path", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
catalog := NewCatalog(metakv, rootPath, "")

validFieldBinlog := []*datapb.FieldBinlog{{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
LogPath: "",
},
},
}}

invalidFieldBinlog := []*datapb.FieldBinlog{{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
LogPath: "no need to store",
},
},
}}

segment := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
NumOfRows: 100,
State: commonpb.SegmentState_Flushed,
}

segment.Statslogs = invalidFieldBinlog
err := catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
segment.Statslogs = validFieldBinlog

segment.Binlogs = invalidFieldBinlog
err = catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
segment.Binlogs = validFieldBinlog

segment.Deltalogs = invalidFieldBinlog
err = catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
})
}

func Test_AlterSegments(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions internal/metastore/kv/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl
if binlog.GetLogID() == 0 {
return fmt.Errorf("invalid log id, binlog:%v", binlog)
}
if binlog.GetLogPath() != "" {
return fmt.Errorf("fieldBinlog no need to store logpath, binlog:%v", binlog)
}
}
return nil
}
Expand Down

0 comments on commit c61fb1e

Please sign in to comment.