diff --git a/internal/datanode/channel_manager_test.go b/internal/datanode/channel_manager_test.go index e0f48bae5fbde..71c2830773d06 100644 --- a/internal/datanode/channel_manager_test.go +++ b/internal/datanode/channel_manager_test.go @@ -22,8 +22,11 @@ import ( "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -41,6 +44,7 @@ type ChannelManagerSuite struct { func (s *ChannelManagerSuite) SetupTest() { ctx := context.Background() s.node = newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) + s.node.allocator = allocator.NewMockAllocator(s.T()) s.manager = NewChannelManager(s.node) } @@ -52,6 +56,26 @@ func getWatchInfoByOpID(opID UniqueID, channel string, state datapb.ChannelWatch CollectionID: 1, ChannelName: channel, }, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 5942af807856d..96de28a3e3e19 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -318,6 +318,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) { func (node *DataNode) tryToReleaseFlowgraph(vChanName string) { log.Info("try to release flowgraph", zap.String("vChanName", vChanName)) node.flowgraphManager.RemoveFlowgraph(vChanName) + node.writeBufferManager.RemoveChannel(vChanName) } // BackGroundGC runs in background to release datanode resources diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index cfff0b9d94b7e..17ec673dff6e7 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/metricsinfo" @@ -206,7 +207,28 @@ func TestDataNode(t *testing.T) { } for _, test := range testDataSyncs { - err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler()) + err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{ + CollectionID: 1, ChannelName: test.dmChannelName, + }, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) assert.NoError(t, err) vchanNameCh <- test.dmChannelName } diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 0a27a2f5a518a..d75f20f757f0a 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -350,7 +350,11 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb resendTTCh = make(chan resendTTMsg, 100) ) - node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator)) + err := node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator)) + if err != nil { + log.Warn("failed to register channel buffer", zap.Error(err)) + return nil, err + } ctx, cancel := context.WithCancel(node.ctx) ds := &dataSyncService{ ctx: ctx, diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index b1e7444588a76..aaba16a3beea5 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -61,6 +61,26 @@ func init() { func getWatchInfo(info *testInfo) *datapb.ChannelWatchInfo { return &datapb.ChannelWatchInfo{ Vchan: getVchanInfo(info), + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } } @@ -157,10 +177,12 @@ func TestDataSyncService_newDataSyncService(t *testing.T) { defer cm.RemoveWithPrefix(ctx, cm.RootPath()) node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) + node.allocator = allocator.NewMockAllocator(t) for _, test := range tests { t.Run(test.description, func(t *testing.T) { node.factory = test.inMsgFactory + defer node.tryToReleaseFlowgraph(test.chanName) ds, err := newServiceWithEtcdTickler( ctx, node, @@ -183,6 +205,39 @@ func TestDataSyncService_newDataSyncService(t *testing.T) { } } +func TestDataSyncService_newDataSyncService_DuplicatedChannel(t *testing.T) { + ctx := context.Background() + + test := &testInfo{ + true, false, &mockMsgStreamFactory{true, true}, + 1, "by-dev-rootcoord-dml-test_v1", + 1, 1, "by-dev-rootcoord-dml-test_v1", 0, + 1, 2, "by-dev-rootcoord-dml-test_v1", 0, + "add un-flushed and flushed segments", + } + cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + + watchInfo := getWatchInfo(test) + + node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64) + node.allocator = allocator.NewMockAllocator(t) + node.factory = test.inMsgFactory + metacache := metacache.NewMockMetaCache(t) + metacache.EXPECT().Collection().Return(test.collID) + metacache.EXPECT().Schema().Return(watchInfo.GetSchema()) + node.writeBufferManager.Register(test.chanName, metacache, nil, writebuffer.WithIDAllocator(allocator.NewMockAllocator(t))) + ds, err := newServiceWithEtcdTickler( + ctx, + node, + watchInfo, + genTestTickler(), + ) + + assert.Error(t, err) + assert.Nil(t, ds) +} + func genBytes() (rawData []byte) { const DIM = 2 const N = 1 diff --git a/internal/datanode/event_manager_test.go b/internal/datanode/event_manager_test.go index 02b20104070e9..177626d7b3cb3 100644 --- a/internal/datanode/event_manager_test.go +++ b/internal/datanode/event_manager_test.go @@ -30,10 +30,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/broker" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -92,6 +94,26 @@ func TestWatchChannel(t *testing.T) { info := &datapb.ChannelWatchInfo{ State: datapb.ChannelWatchState_ToWatch, Vchan: vchan, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } val, err := proto.Marshal(info) assert.NoError(t, err) @@ -162,6 +184,26 @@ func TestWatchChannel(t *testing.T) { info := &datapb.ChannelWatchInfo{ State: datapb.ChannelWatchState_ToRelease, Vchan: vchan, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } val, err := proto.Marshal(info) assert.NoError(t, err) @@ -195,6 +237,26 @@ func TestWatchChannel(t *testing.T) { info := datapb.ChannelWatchInfo{ Vchan: nil, State: datapb.ChannelWatchState_Uncomplete, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } bs, err := proto.Marshal(&info) assert.NoError(t, err) @@ -225,6 +287,26 @@ func TestWatchChannel(t *testing.T) { info = datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ChannelName: ch}, State: datapb.ChannelWatchState_Uncomplete, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } bs, err = proto.Marshal(&info) assert.NoError(t, err) @@ -268,6 +350,26 @@ func TestWatchChannel(t *testing.T) { info := datapb.ChannelWatchInfo{ Vchan: &datapb.VchannelInfo{ChannelName: ch}, State: datapb.ChannelWatchState_Uncomplete, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } bs, err := proto.Marshal(&info) assert.NoError(t, err) @@ -289,6 +391,26 @@ func TestWatchChannel(t *testing.T) { UnflushedSegmentIds: []int64{1}, }, State: datapb.ChannelWatchState_Uncomplete, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, } bs, err := proto.Marshal(&info) assert.NoError(t, err) @@ -449,6 +571,26 @@ func TestEventTickler(t *testing.T) { Vchan: &datapb.VchannelInfo{ ChannelName: channelName, }, + Schema: &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, }, kv, 100*time.Millisecond) defer tickler.stop() endCh := make(chan struct{}, 1) diff --git a/internal/datanode/flow_graph_manager_test.go b/internal/datanode/flow_graph_manager_test.go index ae5f8b8414448..748d3828e5122 100644 --- a/internal/datanode/flow_graph_manager_test.go +++ b/internal/datanode/flow_graph_manager_test.go @@ -24,10 +24,12 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/broker" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -83,7 +85,26 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.HasFlowgraph(vchanName)) - err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler()) + err := fm.AddandStartWithEtcdTickler(node, vchan, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.HasFlowgraph(vchanName)) @@ -98,7 +119,26 @@ func TestFlowGraphManager(t *testing.T) { } require.False(t, fm.HasFlowgraph(vchanName)) - err := fm.AddandStartWithEtcdTickler(node, vchan, nil, genTestTickler()) + err := fm.AddandStartWithEtcdTickler(node, vchan, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) assert.NoError(t, err) assert.True(t, fm.HasFlowgraph(vchanName)) diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 76e0824036670..729d75ff9d121 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -541,14 +541,52 @@ func (s *DataNodeServicesSuite) TestImport() { ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, - }, nil, genTestTickler()) + }, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) s.Require().Nil(err) err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 100, ChannelName: chName2, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, - }, nil, genTestTickler()) + }, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) s.Require().Nil(err) _, ok := s.node.flowgraphManager.GetFlowgraphService(chName1) @@ -604,14 +642,52 @@ func (s *DataNodeServicesSuite) TestImport() { ChannelName: chName1, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, - }, nil, genTestTickler()) + }, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) s.Require().Nil(err) err = s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ CollectionID: 999, // wrong collection ID. ChannelName: chName2, UnflushedSegmentIds: []int64{}, FlushedSegmentIds: []int64{}, - }, nil, genTestTickler()) + }, &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64, + }, + { + FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, + }, + { + FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }, + }, + }, + }, genTestTickler()) s.Require().Nil(err) _, ok := s.node.flowgraphManager.GetFlowgraphService(chName1)