Skip to content

Commit

Permalink
fix store deltaRowcount
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Nov 1, 2024
1 parent ad04c2f commit b88de14
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 5 deletions.
4 changes: 4 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,10 @@ func AddBinlogsOperator(segmentID int64, binlogs, statslogs, deltalogs []*datapb
segment.Binlogs = mergeFieldBinlogs(segment.GetBinlogs(), binlogs)
segment.Statslogs = mergeFieldBinlogs(segment.GetStatslogs(), statslogs)
segment.Deltalogs = mergeFieldBinlogs(segment.GetDeltalogs(), deltalogs)
if len(deltalogs) > 0 {
segment.deltaRowcount.Store(-1)
}

modPack.increments[segmentID] = metastore.BinlogsIncrement{
Segment: segment.SegmentInfo,
}
Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -727,13 +728,15 @@ func TestUpdateSegmentsInfo(t *testing.T) {
meta, err := newMemoryMeta()
assert.NoError(t, err)

segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
segment1 := NewSegmentInfo(&datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
}}
})
err = meta.AddSegment(context.TODO(), segment1)
assert.NoError(t, err)
require.EqualValues(t, -1, segment1.deltaRowcount.Load())
assert.EqualValues(t, 0, segment1.getDeltaCount())

err = meta.UpdateSegmentsInfo(
UpdateStatusOperator(1, commonpb.SegmentState_Flushing),
Expand All @@ -748,6 +751,9 @@ func TestUpdateSegmentsInfo(t *testing.T) {
assert.NoError(t, err)

updated := meta.GetHealthySegment(1)
assert.EqualValues(t, -1, updated.deltaRowcount.Load())
assert.EqualValues(t, 1, updated.getDeltaCount())

expected := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{
ID: 1, State: commonpb.SegmentState_Flushing, NumOfRows: 10,
StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}},
Expand Down
5 changes: 2 additions & 3 deletions internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,9 @@ func (s *SegmentInfo) getSegmentSize() int64 {
return s.size.Load()
}

// L1 segment deltaCount changes in any state
// L0 segment deltaCount won't change
// Any edits on deltalogs of flushed segments will reset deltaRowcount to -1
func (s *SegmentInfo) getDeltaCount() int64 {
if s.deltaRowcount.Load() < 0 || s.GetLevel() != datapb.SegmentLevel_L0 {
if s.deltaRowcount.Load() < 0 || s.GetState() != commonpb.SegmentState_Flushed {
var rc int64
for _, deltaLogs := range s.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
Expand Down

0 comments on commit b88de14

Please sign in to comment.