Skip to content

Commit

Permalink
fix: unit test case
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Dec 5, 2024
1 parent 205006c commit bc57f7d
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 60 deletions.
5 changes: 3 additions & 2 deletions internal/proxy/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,7 @@ func TestReplicateMessageForCollectionMode(t *testing.T) {

mockCache := NewMockCache(t)
mockCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Twice()
mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{}, nil).Twice()
globalMetaCache = mockCache

{
Expand Down Expand Up @@ -2185,8 +2186,8 @@ func TestAlterCollectionReplicateProperty(t *testing.T) {
CollectionName: "foo_collection",
Properties: []*commonpb.KeyValuePair{
{
Key: "replicate.enable",
Value: "false",
Key: "replicate.endTS",
Value: "1",
},
},
})
Expand Down
20 changes: 19 additions & 1 deletion internal/proxy/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4276,7 +4276,7 @@ func TestAlterCollectionForReplicateProperty(t *testing.T) {
assert.Error(t, err)
})

t.Run("fail to alloc ts", func(t *testing.T) {
t.Run("invalid replicate id", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
CollectionName: "test",
Expand All @@ -4290,6 +4290,24 @@ func TestAlterCollectionForReplicateProperty(t *testing.T) {
rootCoord: mockRootcoord,
}

err := task.PreExecute(ctx)
assert.Error(t, err)
})

t.Run("fail to alloc ts", func(t *testing.T) {
task := &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
CollectionName: "test",
Properties: []*commonpb.KeyValuePair{
{
Key: common.ReplicateEndTSKey,
Value: "100",
},
},
},
rootCoord: mockRootcoord,
}

mockRootcoord.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(nil, errors.New("err")).Once()
err := task.PreExecute(ctx)
assert.Error(t, err)
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
coordMocks "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
Expand Down Expand Up @@ -602,6 +603,7 @@ func (suite *ServerSuite) hackServer() {
)

suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{Schema: &schemapb.CollectionSchema{}}, nil).Maybe()
suite.broker.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(&rootcoordpb.DescribeDatabaseResponse{}, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
for _, collection := range suite.collections {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
Expand Down Expand Up @@ -230,6 +231,7 @@ func (suite *TaskSuite) TestSubscribeChannelTask() {
},
}, nil
})
suite.broker.EXPECT().DescribeDatabase(mock.Anything, mock.Anything).Return(&rootcoordpb.DescribeDatabaseResponse{}, nil)
for channel, segment := range suite.growingSegments {
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).
Return([]*datapb.SegmentInfo{
Expand Down
3 changes: 2 additions & 1 deletion internal/querynodev2/metrics_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/json"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
Expand All @@ -49,7 +50,7 @@ func TestGetPipelineJSON(t *testing.T) {

collectionManager := segments.NewMockCollectionManager(t)
segmentManager := segments.NewMockSegmentManager(t)
collectionManager.EXPECT().Get(mock.Anything).Return(&segments.Collection{})
collectionManager.EXPECT().Get(mock.Anything).Return(segments.NewTestCollection(1, querypb.LoadType_UnKnownType, &schemapb.CollectionSchema{}))
manager := &segments.Manager{
Collection: collectionManager,
Segment: segmentManager,
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/pipeline/filter_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (suite *FilterNodeSuite) TestWithLoadCollection() {
suite.validSegmentIDs = []int64{2, 3, 4, 5, 6}

// mock
collection := segments.NewCollectionWithoutSchema(suite.collectionID, querypb.LoadType_LoadCollection)
collection := segments.NewTestCollection(suite.collectionID, querypb.LoadType_LoadCollection, nil)
for _, partitionID := range suite.partitionIDs {
collection.AddPartition(partitionID)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ func (suite *FilterNodeSuite) TestWithLoadPartation() {
suite.validSegmentIDs = []int64{2, 3, 4, 5, 6}

// mock
collection := segments.NewCollectionWithoutSchema(suite.collectionID, querypb.LoadType_LoadPartition)
collection := segments.NewTestCollection(suite.collectionID, querypb.LoadType_LoadPartition, nil)
collection.AddPartition(suite.partitionIDs[0])

mockCollectionManager := segments.NewMockCollectionManager(suite.T())
Expand Down
4 changes: 3 additions & 1 deletion internal/querynodev2/pipeline/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
Expand Down Expand Up @@ -77,7 +79,7 @@ func (suite *PipelineManagerTestSuite) SetupTest() {
func (suite *PipelineManagerTestSuite) TestBasic() {
// init mock
// mock collection manager
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(&segments.Collection{})
suite.collectionManager.EXPECT().Get(suite.collectionID).Return(segments.NewTestCollection(suite.collectionID, querypb.LoadType_UnKnownType, &schemapb.CollectionSchema{}))
// mock mq factory
suite.msgDispatcher.EXPECT().Register(mock.Anything, mock.Anything).Return(suite.msgChan, nil)
suite.msgDispatcher.EXPECT().Deregister(suite.channel)
Expand Down
7 changes: 5 additions & 2 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,16 @@ func NewCollection(collectionID int64, schema *schemapb.CollectionSchema, indexM
return coll
}

func NewCollectionWithoutSchema(collectionID int64, loadType querypb.LoadType) *Collection {
return &Collection{
// Only for test
func NewTestCollection(collectionID int64, loadType querypb.LoadType, schema *schemapb.CollectionSchema) *Collection {
col := &Collection{
id: collectionID,
partitions: typeutil.NewConcurrentSet[int64](),
loadType: loadType,
refCount: atomic.NewUint32(0),
}
col.schema.Store(schema)
return col
}

// new collection without segcore prepare
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/alter_database_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func Test_alterDatabaseTask_Execute(t *testing.T) {
mock.Anything,
).Return(nil)

core := newTestCore(withMeta(meta))
core := newTestCore(withMeta(meta), withValidProxyManager())
task := &alterDatabaseTask{
baseTask: newBaseTask(context.Background(), core),
Req: &rootcoordpb.AlterDatabaseRequest{
Expand Down
81 changes: 81 additions & 0 deletions pkg/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,84 @@ func TestShouldFieldBeLoaded(t *testing.T) {
})
}
}

func TestReplicateProperty(t *testing.T) {
t.Run("ReplicateID", func(t *testing.T) {
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateIDKey,
Value: "1001",
},
}
e, ok := IsReplicateEnabled(p)
assert.True(t, e)
assert.True(t, ok)
i, ok := GetReplicateID(p)
assert.True(t, ok)
assert.Equal(t, "1001", i)
}

{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateIDKey,
Value: "",
},
}
e, ok := IsReplicateEnabled(p)
assert.False(t, e)
assert.True(t, ok)
}

{
p := []*commonpb.KeyValuePair{
{
Key: "foo",
Value: "1001",
},
}
e, ok := IsReplicateEnabled(p)
assert.False(t, e)
assert.False(t, ok)
}
})

t.Run("ReplicateTS", func(t *testing.T) {
{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateEndTSKey,
Value: "1001",
},
}
ts, ok := GetReplicateEndTS(p)
assert.True(t, ok)
assert.EqualValues(t, 1001, ts)
}

{
p := []*commonpb.KeyValuePair{
{
Key: ReplicateEndTSKey,
Value: "foo",
},
}
ts, ok := GetReplicateEndTS(p)
assert.False(t, ok)
assert.EqualValues(t, 0, ts)
}

{
p := []*commonpb.KeyValuePair{
{
Key: "foo",
Value: "1001",
},
}
ts, ok := GetReplicateEndTS(p)
assert.False(t, ok)
assert.EqualValues(t, 0, ts)
}
})
}
7 changes: 0 additions & 7 deletions pkg/mq/msgdispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,6 @@ func (d *Dispatcher) groupingMsgs(pack *MsgPack) map[string]*MsgPack {
}
newMsgs = append(newMsgs, msg)
}
log.Info("fubang messages",
zap.String("vchannel", vchannel),
zap.Any("beginTs", beginTs),
zap.Any("endTs", endTs),
zap.Any("oldMsgs", targetPacks[vchannel].Msgs),
zap.Any("newMsgs", newMsgs),
)
targetPacks[vchannel].Msgs = newMsgs
d.resetMsgPackTS(targetPacks[vchannel], beginTs, endTs)
}
Expand Down
33 changes: 0 additions & 33 deletions pkg/mq/msgstream/mock_msgstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions pkg/mq/msgstream/mq_msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,15 +272,6 @@ func (ms *mqMsgStream) EnableProduce(can bool) {
ms.enableProduce.Store(can)
}

// SetReplicate not safe, please call it only onece before produce or consume
func (ms *mqMsgStream) SetReplicate(config *ReplicateConfig) {
if config == nil {
return
}
ms.replicateID = config.ReplicateID
ms.checkFunc = config.CheckFunc
}

func (ms *mqMsgStream) isEnabledProduce() bool {
return ms.enableProduce.Load().(bool)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/mq/msgstream/msgstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type MsgStream interface {
CheckTopicValid(channel string) error

EnableProduce(can bool)
SetReplicate(config *ReplicateConfig)
}

type ReplicateConfig struct {
Expand Down

0 comments on commit bc57f7d

Please sign in to comment.