Skip to content

Commit

Permalink
enhance: Implement flusher in streamingNode (#34942)
Browse files Browse the repository at this point in the history
- Implement flusher to:
  - Manage the pipelines (creation, deletion, etc.)
  - Manage the segment write buffer
  - Manage sync operation (including receive flushMsg and execute flush)
- Add a new `GetChannelRecoveryInfo` RPC in DataCoord.
- Reorganize packages: `flushcommon` and `datanode`.

issue: #33285

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Aug 2, 2024
1 parent fcec4c2 commit a4439cc
Show file tree
Hide file tree
Showing 83 changed files with 2,031 additions and 591 deletions.
22 changes: 12 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -507,19 +507,21 @@ generate-mockery-datacoord: getdeps

generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/datanode/broker --output=$(PWD)/internal/datanode/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/datanode/io --output=$(PWD)/internal/datanode/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/datanode/pipeline --output=$(PWD)/internal/datanode/pipeline --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=pipeline --inpackage
$(INSTALL_PATH)/mockery --name=ChannelManager --dir=$(PWD)/internal/datanode/channel --output=$(PWD)/internal/datanode/channel --filename=mock_channelmanager.go --with-expecter --structname=MockChannelManager --outpkg=channel --inpackage
$(INSTALL_PATH)/mockery --name=Compactor --dir=$(PWD)/internal/datanode/compaction --output=$(PWD)/internal/datanode/compaction --filename=mock_compactor.go --with-expecter --structname=MockCompactor --outpkg=compaction --inpackage

generate-mockery-flushcommon: getdeps
$(INSTALL_PATH)/mockery --name=Broker --dir=$(PWD)/internal/flushcommon/broker --output=$(PWD)/internal/flushcommon/broker/ --filename=mock_broker.go --with-expecter --structname=MockBroker --outpkg=broker --inpackage
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/flushcommon/metacache --output=$(PWD)/internal/flushcommon/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=MetaWriter --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_meta_writer.go --with-expecter --structname=MockMetaWriter --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Serializer --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_serializer.go --with-expecter --structname=MockSerializer --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=Task --dir=$(PWD)/internal/flushcommon/syncmgr --output=$(PWD)/internal/flushcommon/syncmgr --filename=mock_task.go --with-expecter --structname=MockTask --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/flushcommon/writebuffer --output=$(PWD)/internal/flushcommon/writebuffer --filename=mock_manager.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BinlogIO --dir=$(PWD)/internal/flushcommon/io --output=$(PWD)/internal/flushcommon/io --filename=mock_binlogio.go --with-expecter --structname=MockBinlogIO --outpkg=io --inpackage
$(INSTALL_PATH)/mockery --name=FlowgraphManager --dir=$(PWD)/internal/flushcommon/pipeline --output=$(PWD)/internal/flushcommon/pipeline --filename=mock_fgmanager.go --with-expecter --structname=MockFlowgraphManager --outpkg=pipeline --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
$(INSTALL_PATH)/mockery --name=DataCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_datacoord_catalog.go --with-expecter --structname=DataCoordCatalog --outpkg=mocks
Expand Down
3 changes: 3 additions & 0 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ packages:
github.com/milvus-io/milvus/internal/streamingnode/client/handler/consumer:
interfaces:
Consumer:
github.com/milvus-io/milvus/internal/streamingnode/server/flusher:
interfaces:
Flusher:
github.com/milvus-io/milvus/internal/streamingnode/server/wal:
interfaces:
OpenerBuilder:
Expand Down
36 changes: 36 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,42 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
return resp, nil
}

// GetChannelRecoveryInfo get recovery channel info.
// Called by: StreamingNode.
func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChannelRecoveryInfoRequest) (*datapb.GetChannelRecoveryInfoResponse, error) {
log := log.Ctx(ctx).With(
zap.String("vchannel", req.GetVchannel()),
)
log.Info("get channel recovery info request received")
resp := &datapb.GetChannelRecoveryInfoResponse{
Status: merr.Success(),
}
if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
collectionID := funcutil.GetCollectionIDFromVChannel(req.GetVchannel())
collection, err := s.handler.GetCollection(ctx, collectionID)
if err != nil {
resp.Status = merr.Status(err)
return resp, nil
}
channel := NewRWChannel(req.GetVchannel(), collectionID, nil, collection.Schema, 0) // TODO: remove RWChannel, just use vchannel + collectionID
channelInfo := s.handler.GetDataVChanPositions(channel, allPartitionID)
log.Info("datacoord get channel recovery info",
zap.String("channel", channelInfo.GetChannelName()),
zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())),
zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())),
zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())),
zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())),
zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())),
)

resp.Info = channelInfo
resp.Schema = collection.Schema
return resp, nil
}

// GetFlushedSegments returns all segment matches provided criterion and in state Flushed or Dropped (compacted but not GCed yet)
// If requested partition id < 0, ignores the partition id filter
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
Expand Down
50 changes: 50 additions & 0 deletions internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,56 @@ func TestImportV2(t *testing.T) {
})
}

func TestGetChannelRecoveryInfo(t *testing.T) {
ctx := context.Background()

// server not healthy
s := &Server{}
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GetChannelRecoveryInfo(ctx, nil)
assert.NoError(t, err)
assert.NotEqual(t, int32(0), resp.GetStatus().GetCode())
s.stateCode.Store(commonpb.StateCode_Healthy)

// get collection failed
handler := NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).
Return(nil, errors.New("mock err"))
s.handler = handler
assert.NoError(t, err)
resp, err = s.GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{
Vchannel: "ch-1",
})
assert.NoError(t, err)
assert.Error(t, merr.Error(resp.GetStatus()))

// normal case
channelInfo := &datapb.VchannelInfo{
CollectionID: 0,
ChannelName: "ch-1",
SeekPosition: nil,
UnflushedSegmentIds: []int64{1},
FlushedSegmentIds: []int64{2},
DroppedSegmentIds: []int64{3},
IndexedSegmentIds: []int64{4},
}

handler = NewNMockHandler(t)
handler.EXPECT().GetCollection(mock.Anything, mock.Anything).
Return(&collectionInfo{Schema: &schemapb.CollectionSchema{}}, nil)
handler.EXPECT().GetDataVChanPositions(mock.Anything, mock.Anything).Return(channelInfo)
s.handler = handler

assert.NoError(t, err)
resp, err = s.GetChannelRecoveryInfo(ctx, &datapb.GetChannelRecoveryInfoRequest{
Vchannel: "ch-1",
})
assert.NoError(t, err)
assert.Equal(t, int32(0), resp.GetStatus().GetCode())
assert.NotNil(t, resp.GetSchema())
assert.Equal(t, channelInfo, resp.GetInfo())
}

type GcControlServiceSuite struct {
suite.Suite

Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/channel/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/datanode/util"
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
Expand Down Expand Up @@ -241,7 +241,7 @@ type opRunner struct {
watchFunc watchFunc

guard sync.RWMutex
allOps map[util.UniqueID]*opInfo // opID -> tickler
allOps map[typeutil.UniqueID]*opInfo // opID -> tickler
opsInQueue chan *datapb.ChannelWatchInfo
resultCh chan *opState

Expand All @@ -256,7 +256,7 @@ func NewOpRunner(channel string, pipelineParams *util.PipelineParams, releaseF r
releaseFunc: releaseF,
watchFunc: watchF,
opsInQueue: make(chan *datapb.ChannelWatchInfo, 10),
allOps: make(map[util.UniqueID]*opInfo),
allOps: make(map[typeutil.UniqueID]*opInfo),
resultCh: resultCh,
closeCh: lifetime.NewSafeChan(),
}
Expand All @@ -277,13 +277,13 @@ func (r *opRunner) Start() {
}()
}

func (r *opRunner) FinishOp(opID util.UniqueID) {
func (r *opRunner) FinishOp(opID typeutil.UniqueID) {
r.guard.Lock()
defer r.guard.Unlock()
delete(r.allOps, opID)
}

func (r *opRunner) Exist(opID util.UniqueID) (progress int32, exists bool) {
func (r *opRunner) Exist(opID typeutil.UniqueID) (progress int32, exists bool) {
r.guard.RLock()
defer r.guard.RUnlock()
info, ok := r.allOps[opID]
Expand Down Expand Up @@ -423,7 +423,7 @@ func (r *opRunner) watchWithTimer(info *datapb.ChannelWatchInfo) *opState {
}

// releaseWithTimer will return ReleaseFailure after WatchTimeoutInterval
func (r *opRunner) releaseWithTimer(releaseFunc releaseFunc, channel string, opID util.UniqueID) *opState {
func (r *opRunner) releaseWithTimer(releaseFunc releaseFunc, channel string, opID typeutil.UniqueID) *opState {
opState := &opState{
channel: channel,
opID: opID,
Expand Down
70 changes: 50 additions & 20 deletions internal/datanode/channel/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/broker"
"github.com/milvus-io/milvus/internal/datanode/util"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
util2 "github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand All @@ -43,10 +46,6 @@ import (

func TestMain(t *testing.M) {
paramtable.Init()
err := util.InitGlobalRateCollector()
if err != nil {
panic("init test failed, err = " + err.Error())
}
code := t.Run()
os.Exit(code)
}
Expand Down Expand Up @@ -74,10 +73,10 @@ func (s *OpRunnerSuite) SetupTest() {
Return(make(chan *msgstream.MsgPack), nil).Maybe()
dispClient.EXPECT().Deregister(mock.Anything).Maybe()

s.pipelineParams = &util.PipelineParams{
s.pipelineParams = &util2.PipelineParams{
Ctx: context.TODO(),
Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}},
CheckpointUpdater: util.NewChannelCheckpointUpdater(mockedBroker),
CheckpointUpdater: util2.NewChannelCheckpointUpdater(mockedBroker),
WriteBufferManager: wbManager,
Broker: mockedBroker,
DispClient: dispClient,
Expand All @@ -91,7 +90,7 @@ func (s *OpRunnerSuite) TestWatchWithTimer() {
channel string = "ch-1"
commuCh = make(chan *opState)
)
info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
mockReleaseFunc := func(channel string) {
log.Info("mock release func")
}
Expand All @@ -111,13 +110,13 @@ func (s *OpRunnerSuite) TestWatchTimeout() {
channel := "by-dev-rootcoord-dml-1000"
paramtable.Get().Save(paramtable.Get().DataCoordCfg.WatchTimeoutInterval.Key, "0.000001")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.WatchTimeoutInterval.Key)
info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)

sig := make(chan struct{})
commuCh := make(chan *opState)

mockReleaseFunc := func(channel string) { log.Info("mock release func") }
mockWatchFunc := func(ctx context.Context, param *util.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util.Tickler) (*pipeline.DataSyncService, error) {
mockWatchFunc := func(ctx context.Context, param *util2.PipelineParams, info *datapb.ChannelWatchInfo, tickler *util2.Tickler) (*pipeline.DataSyncService, error) {
<-ctx.Done()
sig <- struct{}{}
return nil, errors.New("timeout")
Expand All @@ -138,13 +137,13 @@ func (s *OpRunnerSuite) TestWatchTimeout() {

type OpRunnerSuite struct {
suite.Suite
pipelineParams *util.PipelineParams
pipelineParams *util2.PipelineParams
}

type ChannelManagerSuite struct {
suite.Suite

pipelineParams *util.PipelineParams
pipelineParams *util2.PipelineParams
manager *ChannelManagerImpl
}

Expand All @@ -160,7 +159,7 @@ func (s *ChannelManagerSuite) SetupTest() {
mockedBroker := &broker.MockBroker{}
mockedBroker.EXPECT().GetSegmentInfo(mock.Anything, mock.Anything).Return([]*datapb.SegmentInfo{}, nil).Maybe()

s.pipelineParams = &util.PipelineParams{
s.pipelineParams = &util2.PipelineParams{
Ctx: context.TODO(),
Session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 0}},
WriteBufferManager: wbManager,
Expand Down Expand Up @@ -189,7 +188,7 @@ func (s *ChannelManagerSuite) TestReleaseStuck() {
stuckSig <- struct{}{}
}

info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
s.Require().Equal(0, s.manager.opRunners.Len())
err := s.manager.Submit(info)
s.Require().NoError(err)
Expand All @@ -199,7 +198,7 @@ func (s *ChannelManagerSuite) TestReleaseStuck() {

s.manager.handleOpState(opState)

releaseInfo := util.GetWatchInfoByOpID(101, channel, datapb.ChannelWatchState_ToRelease)
releaseInfo := GetWatchInfoByOpID(101, channel, datapb.ChannelWatchState_ToRelease)
paramtable.Get().Save(paramtable.Get().DataCoordCfg.WatchTimeoutInterval.Key, "0.1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.WatchTimeoutInterval.Key)

Expand All @@ -225,7 +224,7 @@ func (s *ChannelManagerSuite) TestReleaseStuck() {
func (s *ChannelManagerSuite) TestSubmitIdempotent() {
channel := "by-dev-rootcoord-dml-1"

info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
s.Require().Equal(0, s.manager.opRunners.Len())

for i := 0; i < 10; i++ {
Expand All @@ -244,7 +243,7 @@ func (s *ChannelManagerSuite) TestSubmitIdempotent() {
func (s *ChannelManagerSuite) TestSubmitSkip() {
channel := "by-dev-rootcoord-dml-1"

info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
s.Require().Equal(0, s.manager.opRunners.Len())

err := s.manager.Submit(info)
Expand All @@ -271,7 +270,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
channel := "by-dev-rootcoord-dml-0"

// watch
info := util.GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
info := GetWatchInfoByOpID(100, channel, datapb.ChannelWatchState_ToWatch)
err := s.manager.Submit(info)
s.NoError(err)

Expand All @@ -296,7 +295,7 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.Equal(datapb.ChannelWatchState_WatchSuccess, resp.GetState())

// release
info = util.GetWatchInfoByOpID(101, channel, datapb.ChannelWatchState_ToRelease)
info = GetWatchInfoByOpID(101, channel, datapb.ChannelWatchState_ToRelease)
err = s.manager.Submit(info)
s.NoError(err)

Expand All @@ -320,3 +319,34 @@ func (s *ChannelManagerSuite) TestSubmitWatchAndRelease() {
s.False(ok)
s.Nil(runner)
}

func GetWatchInfoByOpID(opID typeutil.UniqueID, channel string, state datapb.ChannelWatchState) *datapb.ChannelWatchInfo {
return &datapb.ChannelWatchInfo{
OpID: opID,
State: state,
Vchan: &datapb.VchannelInfo{
CollectionID: 1,
ChannelName: channel,
},
Schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64,
},
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
},
},
},
},
}
}
2 changes: 1 addition & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/proto/clusteringpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
Expand Down
Loading

0 comments on commit a4439cc

Please sign in to comment.