Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Nov 4, 2024
1 parent 49170c4 commit 0955e28
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 32 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, SetSegmentInvisible(segID, false))
operators = append(operators, SetSegmentIsInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetIsInvisible() && s.GetLevel() != datapb.SegmentLevel_L1 {
// skip invisible segments
if s.GetIsInvisible() && s.GetCreatedByCompaction() {
// skip invisible compaction segments
continue
}

Expand Down
59 changes: 35 additions & 24 deletions internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
Expand All @@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
Expand All @@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
Expand All @@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
assert.NoError(t, err)
Expand All @@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
assert.NoError(t, err)
Expand All @@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
assert.NoError(t, err)
Expand All @@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
assert.NoError(t, err)
Expand All @@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
assert.NoError(t, err)
Expand All @@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
assert.NoError(t, err)
Expand All @@ -855,8 +864,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
Expand Down Expand Up @@ -913,10 +923,11 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{19},
IsInvisible: false,
IsSorted: true,
NumOfRows: 2048,
CompactionFrom: []int64{19},
CreatedByCompaction: true,
IsInvisible: false,
IsSorted: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20))
assert.NoError(t, err)
Expand Down
8 changes: 4 additions & 4 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,15 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
}
}

func SetSegmentInvisible(segmentID int64, invisible bool) UpdateOperator {
func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update segment visible fail - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
segment.IsInvisible = invisible
segment.IsInvisible = isInvisible
return true
}
}
Expand Down Expand Up @@ -1980,7 +1980,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)

resultInvisible := oldSegment.GetIsInvisible()
if oldSegment.GetLevel() == datapb.SegmentLevel_L1 {
if !oldSegment.GetCreatedByCompaction() {
resultInvisible = false
}

Expand All @@ -1999,6 +1999,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastLevel: oldSegment.GetLastLevel(),
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
CreatedByCompaction: oldSegment.GetCreatedByCompaction(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Expand All @@ -2007,7 +2008,6 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
TextStatsLogs: result.GetTextStatsLogs(),
Bm25Statslogs: result.GetBm25Logs(),
Deltalogs: nil,
CreatedByCompaction: true,
CompactionFrom: []int64{oldSegmentID},
IsSorted: true,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
}
// set segment to SegmentState_Flushed
var operators []UpdateOperator
operators = append(operators, SetSegmentInvisible(segmentID, true))
operators = append(operators, SetSegmentIsInvisible(segmentID, true))
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed))
err := s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
Expand Down

0 comments on commit 0955e28

Please sign in to comment.