Skip to content

Commit

Permalink
fix: [Cherry-pick] Cleanup write buffer when flowgraph released (#31377)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #31376
See also #30137

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Mar 18, 2024
1 parent b2b107a commit 3254a14
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 8 deletions.
24 changes: 24 additions & 0 deletions internal/datanode/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand All @@ -41,6 +44,7 @@ type ChannelManagerSuite struct {
func (s *ChannelManagerSuite) SetupTest() {
ctx := context.Background()
s.node = newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
s.node.allocator = allocator.NewMockAllocator(s.T())
s.manager = NewChannelManager(s.node)
}

Expand All @@ -52,6 +56,26 @@ func getWatchInfoByOpID(opID UniqueID, channel string, state datapb.ChannelWatch
CollectionID: 1,
ChannelName: channel,
},
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (node *DataNode) handleChannelEvt(evt *clientv3.Event) {
func (node *DataNode) tryToReleaseFlowgraph(vChanName string) {
log.Info("try to release flowgraph", zap.String("vChanName", vChanName))
node.flowgraphManager.RemoveFlowgraph(vChanName)
node.writeBufferManager.RemoveChannel(vChanName)
}

// BackGroundGC runs in background to release datanode resources
Expand Down
24 changes: 23 additions & 1 deletion internal/datanode/data_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
Expand Down Expand Up @@ -206,7 +207,28 @@ func TestDataNode(t *testing.T) {
}

for _, test := range testDataSyncs {
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName}, nil, genTestTickler())
err = node.flowgraphManager.AddandStartWithEtcdTickler(node, &datapb.VchannelInfo{
CollectionID: 1, ChannelName: test.dmChannelName,
}, &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
}, genTestTickler())
assert.NoError(t, err)
vchanNameCh <- test.dmChannelName
}
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,11 @@ func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb
resendTTCh = make(chan resendTTMsg, 100)
)

node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator))
err := node.writeBufferManager.Register(channelName, metacache, storageV2Cache, writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(node.broker, config.serverID)), writebuffer.WithIDAllocator(node.allocator))
if err != nil {
log.Warn("failed to register channel buffer", zap.Error(err))
return nil, err
}
ctx, cancel := context.WithCancel(node.ctx)
ds := &dataSyncService{
ctx: ctx,
Expand Down
55 changes: 55 additions & 0 deletions internal/datanode/data_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ func init() {
func getWatchInfo(info *testInfo) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
Vchan: getVchanInfo(info),
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
}

Expand Down Expand Up @@ -157,10 +177,12 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
defer cm.RemoveWithPrefix(ctx, cm.RootPath())

node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
node.allocator = allocator.NewMockAllocator(t)

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
node.factory = test.inMsgFactory
defer node.tryToReleaseFlowgraph(test.chanName)
ds, err := newServiceWithEtcdTickler(
ctx,
node,
Expand All @@ -183,6 +205,39 @@ func TestDataSyncService_newDataSyncService(t *testing.T) {
}
}

func TestDataSyncService_newDataSyncService_DuplicatedChannel(t *testing.T) {
ctx := context.Background()

test := &testInfo{
true, false, &mockMsgStreamFactory{true, true},
1, "by-dev-rootcoord-dml-test_v1",
1, 1, "by-dev-rootcoord-dml-test_v1", 0,
1, 2, "by-dev-rootcoord-dml-test_v1", 0,
"add un-flushed and flushed segments",
}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())

watchInfo := getWatchInfo(test)

node := newIDLEDataNodeMock(ctx, schemapb.DataType_Int64)
node.allocator = allocator.NewMockAllocator(t)
node.factory = test.inMsgFactory
metacache := metacache.NewMockMetaCache(t)
metacache.EXPECT().Collection().Return(test.collID)
metacache.EXPECT().Schema().Return(watchInfo.GetSchema())
node.writeBufferManager.Register(test.chanName, metacache, nil, writebuffer.WithIDAllocator(allocator.NewMockAllocator(t)))
ds, err := newServiceWithEtcdTickler(
ctx,
node,
watchInfo,
genTestTickler(),
)

assert.Error(t, err)
assert.Nil(t, ds)
}

func genBytes() (rawData []byte) {
const DIM = 2
const N = 1
Expand Down
142 changes: 142 additions & 0 deletions internal/datanode/event_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/broker"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -92,6 +94,26 @@ func TestWatchChannel(t *testing.T) {
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToWatch,
Vchan: vchan,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
val, err := proto.Marshal(info)
assert.NoError(t, err)
Expand Down Expand Up @@ -162,6 +184,26 @@ func TestWatchChannel(t *testing.T) {
info := &datapb.ChannelWatchInfo{
State: datapb.ChannelWatchState_ToRelease,
Vchan: vchan,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
val, err := proto.Marshal(info)
assert.NoError(t, err)
Expand Down Expand Up @@ -195,6 +237,26 @@ func TestWatchChannel(t *testing.T) {
info := datapb.ChannelWatchInfo{
Vchan: nil,
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
Expand Down Expand Up @@ -225,6 +287,26 @@ func TestWatchChannel(t *testing.T) {
info = datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err = proto.Marshal(&info)
assert.NoError(t, err)
Expand Down Expand Up @@ -268,6 +350,26 @@ func TestWatchChannel(t *testing.T) {
info := datapb.ChannelWatchInfo{
Vchan: &datapb.VchannelInfo{ChannelName: ch},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
Expand All @@ -289,6 +391,26 @@ func TestWatchChannel(t *testing.T) {
UnflushedSegmentIds: []int64{1},
},
State: datapb.ChannelWatchState_Uncomplete,
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
bs, err := proto.Marshal(&info)
assert.NoError(t, err)
Expand Down Expand Up @@ -449,6 +571,26 @@ func TestEventTickler(t *testing.T) {
Vchan: &datapb.VchannelInfo{
ChannelName: channelName,
},
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}, kv, 100*time.Millisecond)
defer tickler.stop()
endCh := make(chan struct{}, 1)
Expand Down
Loading

0 comments on commit 3254a14

Please sign in to comment.