Skip to content

Commit

Permalink
enhance: add ut for clustering_compactor (#34852)
Browse files Browse the repository at this point in the history
issue: #34792

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jul 21, 2024
1 parent c2b8b5f commit d294fdd
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 22 deletions.
34 changes: 19 additions & 15 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,10 @@ func NewClusteringCompactionTask(
plan *datapb.CompactionPlan,
) *clusteringCompactionTask {
ctx, cancel := context.WithCancel(ctx)
logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegments().GetBegin(), plan.GetPreAllocatedSegments().GetEnd())
return &clusteringCompactionTask{
ctx: ctx,
cancel: cancel,
binlogIO: binlogIO,
logIDAlloc: logIDAlloc,
segIDAlloc: segIDAlloc,
plan: plan,
tr: timerecord.NewTimeRecorder("clustering_compaction"),
done: make(chan struct{}, 1),
Expand Down Expand Up @@ -179,12 +175,23 @@ func (t *clusteringCompactionTask) GetCollection() int64 {
}

func (t *clusteringCompactionTask) init() error {
if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
if len(t.plan.GetSegmentBinlogs()) == 0 {
return merr.WrapErrIllegalCompactionPlan("empty segment binlogs")
}
t.collectionID = t.GetCollection()
t.partitionID = t.plan.GetSegmentBinlogs()[0].GetPartitionID()

logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64)
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegments().GetBegin(), t.plan.GetPreAllocatedSegments().GetEnd())
t.logIDAlloc = logIDAlloc
t.segIDAlloc = segIDAlloc

var pkField *schemapb.FieldSchema
if t.plan.Schema == nil {
return errors.New("empty schema in compactionPlan")
return merr.WrapErrIllegalCompactionPlan("empty schema in compactionPlan")
}
for _, field := range t.plan.Schema.Fields {
if field.GetIsPrimaryKey() && field.GetFieldID() >= 100 && typeutil.IsPrimaryFieldType(field.GetDataType()) {
Expand All @@ -209,22 +216,19 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("clusteringCompaction-%d", t.GetPlanID()))
defer span.End()
log := log.With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String()))
if t.plan.GetType() != datapb.CompactionType_ClusteringCompaction {
// this shouldn't be reached
log.Warn("compact wrong, illegal compaction type")
return nil, merr.WrapErrIllegalCompactionPlan()
// 0, verify and init
err := t.init()
if err != nil {
log.Error("compaction task init failed", zap.Error(err))
return nil, err
}

if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
}
ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()

err := t.init()
if err != nil {
return nil, err
}
defer t.cleanUp(ctx)

// 1, download delta logs to build deltaMap
Expand Down Expand Up @@ -1031,7 +1035,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, merr.WrapErrIllegalCompactionPlan()
return nil, merr.WrapErrIllegalCompactionPlan("all segments' binlogs are empty")
}
log.Debug("binlogNum", zap.Int("binlogNum", binlogNum))
for idx := 0; idx < binlogNum; idx++ {
Expand Down
136 changes: 129 additions & 7 deletions internal/datanode/compaction/clustering_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@ package compaction
import (
"context"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus-storage/go/common/log"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

func TestClusteringCompactionTaskSuite(t *testing.T) {
Expand All @@ -40,6 +49,9 @@ type ClusteringCompactionTaskSuite struct {
suite.Suite

mockBinlogIO *io.MockBinlogIO
mockAlloc *allocator.MockAllocator
mockID atomic.Int64
segWriter *SegmentWriter

task *clusteringCompactionTask

Expand All @@ -53,6 +65,21 @@ func (s *ClusteringCompactionTaskSuite) SetupSuite() {
func (s *ClusteringCompactionTaskSuite) SetupTest() {
s.mockBinlogIO = io.NewMockBinlogIO(s.T())

s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Maybe()

s.mockAlloc = allocator.NewMockAllocator(s.T())
s.mockID.Store(time.Now().UnixMilli())
s.mockAlloc.EXPECT().Alloc(mock.Anything).RunAndReturn(func(x uint32) (int64, int64, error) {
start := s.mockID.Load()
end := s.mockID.Add(int64(x))
log.Info("wayblink", zap.Int64("start", start), zap.Int64("end", end))
return start, end, nil
}).Maybe()
s.mockAlloc.EXPECT().AllocOne().RunAndReturn(func() (int64, error) {
end := s.mockID.Add(1)
return end, nil
}).Maybe()

s.task = NewClusteringCompactionTask(context.Background(), s.mockBinlogIO, nil)

paramtable.Get().Save(paramtable.Get().CommonCfg.EntityExpirationTTL.Key, "0")
Expand Down Expand Up @@ -98,19 +125,103 @@ func (s *ClusteringCompactionTaskSuite) TestContextDown() {

func (s *ClusteringCompactionTaskSuite) TestIsVectorClusteringKey() {
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = Int32Field
s.task.plan.ClusteringKeyField = 100
s.task.init()
s.Equal(false, s.task.isVectorClusteringKey)
s.task.plan.ClusteringKeyField = FloatVectorField
s.task.plan.ClusteringKeyField = 103
s.task.init()
s.Equal(true, s.task.isVectorClusteringKey)
}

func (s *ClusteringCompactionTaskSuite) TestGetScalarResult() {
func (s *ClusteringCompactionTaskSuite) TestCompactionWithEmptyBinlog() {
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = Int32Field
s.task.plan.ClusteringKeyField = 100
_, err := s.task.Compact()
s.Require().Error(err)
s.Equal(true, errors.Is(err, merr.ErrIllegalCompactionPlan))
s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{}
_, err2 := s.task.Compact()
s.Require().Error(err2)
s.Equal(true, errors.Is(err2, merr.ErrIllegalCompactionPlan))
}

func (s *ClusteringCompactionTaskSuite) TestCompactionWithEmptySchema() {
s.task.plan.ClusteringKeyField = 100
_, err := s.task.Compact()
s.Require().Error(err)
s.Equal(true, errors.Is(err, merr.ErrIllegalCompactionPlan))
}

func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
},
}
err := s.task.init()
s.Require().NoError(err)
s.Equal(s.task.primaryKeyField, s.task.plan.Schema.Fields[2])
s.Equal(false, s.task.isVectorClusteringKey)
s.Equal(true, s.task.memoryBufferSize > 0)
s.Equal(8, s.task.getWorkerPoolSize())
s.Equal(8, s.task.mappingPool.Cap())
s.Equal(8, s.task.flushPool.Cap())
}

func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
schema := genCollectionSchema()
var segmentID int64 = 1001
segWriter, err := NewSegmentWriter(schema, 1000, segmentID, PartitionID, CollectionID)
s.Require().NoError(err)

for i := 0; i < 1000; i++ {
v := storage.Value{
PK: storage.NewInt64PrimaryKey(int64(i)),
Timestamp: int64(tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)),
Value: genRow(int64(i)),
}
err = segWriter.Write(&v)
s.Require().NoError(err)
}
segWriter.writer.Flush()

kvs, fBinlogs, err := serializeWrite(context.TODO(), s.mockAlloc, segWriter)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(lo.Values(kvs), nil)

s.plan.SegmentBinlogs = []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 100,
FieldBinlogs: lo.Values(fBinlogs),
},
}

s.task.plan.Schema = genCollectionSchema()
s.task.plan.ClusteringKeyField = 100
s.task.plan.PreferSegmentRows = 100
s.task.plan.MaxSegmentRows = 200
s.task.plan.PreAllocatedSegments = &datapb.IDRange{
Begin: time.Now().UnixMilli(),
End: time.Now().UnixMilli() + 1000,
}

compactionResult, err := s.task.Compact()
s.Require().NoError(err)
s.Equal(10, len(s.task.clusterBuffers))
s.Equal(10, len(compactionResult.GetSegments()))
}

func genRow(magic int64) map[int64]interface{} {
ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
return map[int64]interface{}{
common.RowIDField: magic,
common.TimeStampField: int64(ts),
100: magic,
101: int32(magic),
102: "varchar",
103: []float32{4, 5, 6, 7},
}
}

func genCollectionSchema() *schemapb.CollectionSchema {
Expand All @@ -124,12 +235,23 @@ func genCollectionSchema() *schemapb.CollectionSchema {
DataType: schemapb.DataType_Int64,
},
{
FieldID: Int32Field,
FieldID: common.TimeStampField,
Name: "Timestamp",
DataType: schemapb.DataType_Int64,
},
{
FieldID: 100,
Name: "pk",
DataType: schemapb.DataType_Int64,
IsPrimaryKey: true,
},
{
FieldID: 101,
Name: "field_int32",
DataType: schemapb.DataType_Int32,
},
{
FieldID: VarCharField,
FieldID: 102,
Name: "field_varchar",
DataType: schemapb.DataType_VarChar,
TypeParams: []*commonpb.KeyValuePair{
Expand All @@ -140,7 +262,7 @@ func genCollectionSchema() *schemapb.CollectionSchema {
},
},
{
FieldID: FloatVectorField,
FieldID: 103,
Name: "field_float_vector",
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
Expand Down

0 comments on commit d294fdd

Please sign in to comment.