From 1960deccdc031571d90251ab402e2cebc35acd69 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 13 Nov 2024 11:35:09 +0800 Subject: [PATCH 1/7] enhance: Reduce GetRecoveryInfo calls from querycoord Signed-off-by: bigsheeper --- internal/datacoord/dataview/data_view.go | 28 ++++ internal/datacoord/dataview/update_chan.go | 33 ++++ internal/datacoord/dataview/view_manager.go | 158 ++++++++++++++++++ internal/datacoord/server.go | 6 + internal/datacoord/services.go | 128 +++++++++++--- .../distributed/datacoord/client/client.go | 12 ++ internal/distributed/datacoord/service.go | 5 + internal/mocks/mock_datacoord.go | 59 +++++++ internal/mocks/mock_datacoord_client.go | 74 ++++++++ internal/proto/data_coord.proto | 11 ++ .../querycoordv2/meta/coordinator_broker.go | 21 +++ internal/querycoordv2/meta/mock_broker.go | 59 +++++++ .../querycoordv2/observers/target_observer.go | 59 ++++++- .../observers/target_observer_test.go | 8 + internal/querycoordv2/services_test.go | 13 ++ pkg/util/paramtable/component_param.go | 13 ++ pkg/util/typeutil/map.go | 2 +- 17 files changed, 656 insertions(+), 33 deletions(-) create mode 100644 internal/datacoord/dataview/data_view.go create mode 100644 internal/datacoord/dataview/update_chan.go create mode 100644 internal/datacoord/dataview/view_manager.go diff --git a/internal/datacoord/dataview/data_view.go b/internal/datacoord/dataview/data_view.go new file mode 100644 index 0000000000000..c34b59b820373 --- /dev/null +++ b/internal/datacoord/dataview/data_view.go @@ -0,0 +1,28 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import "github.com/milvus-io/milvus/internal/proto/datapb" + +const InitialDataViewVersion = 0 + +type DataView struct { + CollectionID int64 + Channels map[string]*datapb.VchannelInfo + Segments map[int64]struct{} + Version int64 +} diff --git a/internal/datacoord/dataview/update_chan.go b/internal/datacoord/dataview/update_chan.go new file mode 100644 index 0000000000000..ba139d68bcd45 --- /dev/null +++ b/internal/datacoord/dataview/update_chan.go @@ -0,0 +1,33 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import "sync" + +var updateChan chan int64 +var initOnce sync.Once + +func initUpdateChan() { + initOnce.Do(func() { + updateChan = make(chan int64, 1024) + }) +} + +// NotifyUpdate used to trigger updating data view immediately. +func NotifyUpdate(collectionID int64) { + updateChan <- collectionID +} diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go new file mode 100644 index 0000000000000..9eed50065bad2 --- /dev/null +++ b/internal/datacoord/dataview/view_manager.go @@ -0,0 +1,158 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import ( + "sync" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type PullNewDataViewFunction func(collectionID int64) (*DataView, error) + +type ViewManager interface { + Get(collectionID int64) (*DataView, error) + GetVersion(collectionID int64) int64 + + Start() + Close() +} + +type dataViewManager struct { + pullFn PullNewDataViewFunction + currentViews *typeutil.ConcurrentMap[int64, *DataView] + + closeOnce sync.Once + closeChan chan struct{} +} + +func NewDataViewManager(pullFn PullNewDataViewFunction) ViewManager { + initUpdateChan() + return &dataViewManager{ + pullFn: pullFn, + currentViews: typeutil.NewConcurrentMap[int64, *DataView](), + closeChan: make(chan struct{}), + } +} + +func (m *dataViewManager) Get(collectionID int64) (*DataView, error) { + if view, ok := m.currentViews.Get(collectionID); ok { + return view, nil + } + view, err := m.pullFn(collectionID) + if err != nil { + return nil, err + } + m.currentViews.GetOrInsert(collectionID, view) + return view, nil +} + +func (m *dataViewManager) GetVersion(collectionID int64) int64 { + if view, ok := m.currentViews.Get(collectionID); ok { + return view.Version + } + return InitialDataViewVersion +} + +func (m *dataViewManager) Start() { + ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second)) + defer ticker.Stop() + for { + select { + case <-m.closeChan: + log.Info("data view manager exited") + return + case <-ticker.C: + // periodically update all data view + for _, collectionID := range m.currentViews.Keys() { + m.TryUpdateDataView(collectionID) + } + case collectionID := <-updateChan: + m.TryUpdateDataView(collectionID) + } + } +} + +func (m *dataViewManager) Close() { + m.closeOnce.Do(func() { + close(m.closeChan) + }) +} + +func (m *dataViewManager) update(view *DataView) { + _, ok := m.currentViews.GetOrInsert(view.CollectionID, view) + if ok { + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) + } +} + +func (m *dataViewManager) TryUpdateDataView(collectionID int64) { + newView, err := m.pullFn(collectionID) + if err != nil { + log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + // notify to trigger pull again + NotifyUpdate(collectionID) + return + } + + currentView, ok := m.currentViews.Get(collectionID) + if !ok { + m.currentViews.GetOrInsert(collectionID, newView) + return + } + // no-op if the incoming version is less than the current version. + if newView.Version <= currentView.Version { + return + } + + // check if channel info has been updated. + for channel, new := range newView.Channels { + current, ok := currentView.Channels[channel] + if !ok { + m.update(newView) + return + } + if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) || + !funcutil.SliceSetEqual(new.GetUnflushedSegmentIds(), current.GetUnflushedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || + !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { + m.update(newView) + return + } + if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { + m.update(newView) + return + } + // TODO: It might be too frequent. + if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + m.update(newView) + return + } + } + + // check if segment info has been updated. + if !typeutil.MapEqual(newView.Segments, currentView.Segments) { + m.currentViews.GetOrInsert(collectionID, newView) + } +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index c735681951f3f..9520861eac375 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -40,6 +40,7 @@ import ( globalIDAllocator "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" + "github.com/milvus-io/milvus/internal/datacoord/dataview" "github.com/milvus-io/milvus/internal/datacoord/session" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" @@ -125,6 +126,7 @@ type Server struct { importMeta ImportMeta importScheduler ImportScheduler importChecker ImportChecker + viewManager dataview.ViewManager compactionTrigger trigger compactionHandler compactionPlanContext @@ -415,6 +417,8 @@ func (s *Server) initDataCoord() error { s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.importMeta, s.jobManager) + s.viewManager = dataview.NewDataViewManager(s.pullNewDataView) + s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) @@ -751,6 +755,7 @@ func (s *Server) startServerLoop() { s.startFlushLoop(s.serverLoopCtx) go s.importScheduler.Start() go s.importChecker.Start() + go s.viewManager.Start() s.garbageCollector.start() if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { @@ -1091,6 +1096,7 @@ func (s *Server) Stop() error { s.importScheduler.Close() s.importChecker.Close() + s.viewManager.Close() s.syncSegmentsScheduler.Stop() s.stopCompaction() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 3b263a9383fee..e3dc4cdeb36e7 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/datacoord/dataview" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -844,13 +845,35 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf return resp, nil } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + log := log.Ctx(ctx).With(zap.Int("numCollections", len(req.GetCollectionIDs()))) + log.Info("GetDataViewVersions request received") + resp := &datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + } + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetDataViewVersionsResponse{ + Status: merr.Status(err), + }, nil + } + + versions := make(map[int64]int64, len(req.GetCollectionIDs())) + for _, id := range req.GetCollectionIDs() { + versions[id] = s.viewManager.GetVersion(id) + } + + resp.DataViewVersions = versions + log.Info("GetDataViewVersions done") + return resp, nil +} + // GetRecoveryInfoV2 get recovery info for segment // Called by: QueryCoord. func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryInfoRequestV2) (*datapb.GetRecoveryInfoResponseV2, error) { - log := log.Ctx(ctx) collectionID := req.GetCollectionID() partitionIDs := req.GetPartitionIDs() - log = log.With( + log := log.Ctx(ctx).With( zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), ) @@ -863,13 +886,21 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI Status: merr.Status(err), }, nil } - channels := s.channelManager.GetChannelsByCollectionID(collectionID) - channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) - flushedIDs := make(typeutil.UniqueSet) - for _, ch := range channels { - channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...) + + dataView, err := s.viewManager.Get(req.GetCollectionID()) + if err != nil { + log.Warn("get data view failed in GetRecoveryInfoV2", zap.Error(err)) + resp.Status = merr.Status(err) + return resp, nil + } + + channelInfos := make([]*datapb.VchannelInfo, 0, len(dataView.Channels)) + for _, info := range dataView.Channels { + channelInfo := typeutil.Clone(info) + // retrieve target partition stats versions + channelInfo.PartitionStatsVersions = lo.PickByKeys(channelInfo.PartitionStatsVersions, req.GetPartitionIDs()) channelInfos = append(channelInfos, channelInfo) - log.Info("datacoord append channelInfo in GetRecoveryInfo", + log.Info("datacoord append channelInfo in GetRecoveryInfoV2", zap.String("channel", channelInfo.GetChannelName()), zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())), zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), @@ -877,31 +908,17 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), ) - flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) } segmentInfos := make([]*datapb.SegmentInfo, 0) - for id := range flushedIDs { + for id := range dataView.Segments { segment := s.meta.GetSegment(id) if segment == nil { - err := merr.WrapErrSegmentNotFound(id) + err = merr.WrapErrSegmentNotFound(id) log.Warn("failed to get segment", zap.Int64("segmentID", id)) resp.Status = merr.Status(err) return resp, nil } - // Skip non-flushing, non-flushed and dropped segments. - if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { - continue - } - // Also skip bulk insert segments. - if segment.GetIsImporting() { - continue - } - - binlogs := segment.GetBinlogs() - if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 { - continue - } rowCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo) if rowCount != segment.NumOfRows && rowCount > 0 { log.Warn("segment row number meta inconsistent with bin log row count and will be corrected", @@ -911,7 +928,6 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI } else { rowCount = segment.NumOfRows } - segmentInfos = append(segmentInfos, &datapb.SegmentInfo{ ID: segment.ID, PartitionID: segment.PartitionID, @@ -928,6 +944,68 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI return resp, nil } +func (s *Server) pullNewDataView(collectionID int64) (*dataview.DataView, error) { + version := time.Now().UnixNano() + log := log.With( + zap.Int64("collectionID", collectionID), + zap.Int64("version", version), + ) + + channels := s.channelManager.GetChannelsByCollectionID(collectionID) + channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) + flushedIDs := make(typeutil.UniqueSet) + + for _, ch := range channels { + channelInfo := s.handler.GetQueryVChanPositions(ch, allPartitionID) + channelInfos = append(channelInfos, channelInfo) + log.Info("datacoord append channelInfo in pullNewDataView", + zap.String("channel", channelInfo.GetChannelName()), + zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())), + zap.Int("# of flushed segments", len(channelInfo.GetFlushedSegmentIds())), + zap.Int("# of dropped segments", len(channelInfo.GetDroppedSegmentIds())), + zap.Int("# of indexed segments", len(channelInfo.GetIndexedSegmentIds())), + zap.Int("# of l0 segments", len(channelInfo.GetLevelZeroSegmentIds())), + ) + flushedIDs.Insert(channelInfo.GetFlushedSegmentIds()...) + } + + segments := make([]int64, 0) + for id := range flushedIDs { + segment := s.meta.GetSegment(id) + if segment == nil { + err := merr.WrapErrSegmentNotFound(id) + log.Warn("failed to get segment", zap.Int64("segmentID", id)) + return nil, err + } + // Skip non-flushing, non-flushed and dropped segments. + if segment.State != commonpb.SegmentState_Flushed && segment.State != commonpb.SegmentState_Flushing && segment.State != commonpb.SegmentState_Dropped { + continue + } + // Also skip bulk insert segments. + if segment.GetIsImporting() { + continue + } + + binlogs := segment.GetBinlogs() + if len(binlogs) == 0 && segment.GetLevel() != datapb.SegmentLevel_L0 { + continue + } + segments = append(segments, id) + } + + newDV := &dataview.DataView{ + CollectionID: collectionID, + Channels: lo.KeyBy(channelInfos, func(v *datapb.VchannelInfo) string { + return v.GetChannelName() + }), + Segments: lo.SliceToMap(segments, func(id int64) (int64, struct{}) { + return id, struct{}{} + }), + Version: version, + } + return newDV, nil +} + // GetChannelRecoveryInfo get recovery channel info. // Called by: StreamingNode. func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChannelRecoveryInfoRequest) (*datapb.GetChannelRecoveryInfoResponse, error) { diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index bb095cdae30b0..546933adb0cd1 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -321,6 +321,18 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath }) } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (c *Client) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) { + req = typeutil.Clone(req) + commonpbutil.UpdateMsgBase( + req.GetBase(), + commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)), + ) + return wrapGrpcCall(ctx, c, func(client datapb.DataCoordClient) (*datapb.GetDataViewVersionsResponse, error) { + return client.GetDataViewVersions(ctx, req) + }) +} + // GetRecoveryInfo request segment recovery info of collection/partition // // ctx is the context to control request deadline and cancellation diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index ee17f8c0d3a03..c76a76e04b798 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -350,6 +350,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath return s.dataCoord.SaveBinlogPaths(ctx, req) } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (s *Server) GetDataViewVersions(ctx context.Context, req *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + return s.dataCoord.GetDataViewVersions(ctx, req) +} + // GetRecoveryInfo gets information for recovering channels func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) { return s.dataCoord.GetRecoveryInfo(ctx, req) diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index b33f31f3c155f..363910eda1ecb 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -1041,6 +1041,65 @@ func (_c *MockDataCoord_GetComponentStates_Call) RunAndReturn(run func(context.C return _c } +// GetDataViewVersions provides a mock function with given fields: _a0, _a1 +func (_m *MockDataCoord) GetDataViewVersions(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error) { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 *datapb.GetDataViewVersionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest) *datapb.GetDataViewVersionsResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoord_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockDataCoord_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *datapb.GetDataViewVersionsRequest +func (_e *MockDataCoord_Expecter) GetDataViewVersions(_a0 interface{}, _a1 interface{}) *MockDataCoord_GetDataViewVersions_Call { + return &MockDataCoord_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", _a0, _a1)} +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) Run(run func(_a0 context.Context, _a1 *datapb.GetDataViewVersionsRequest)) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest)) + }) + return _c +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoord_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoord_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetFlushAllState provides a mock function with given fields: _a0, _a1 func (_m *MockDataCoord) GetFlushAllState(_a0 context.Context, _a1 *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datacoord_client.go b/internal/mocks/mock_datacoord_client.go index 9f42a0f953877..c473c8d812f0f 100644 --- a/internal/mocks/mock_datacoord_client.go +++ b/internal/mocks/mock_datacoord_client.go @@ -1336,6 +1336,80 @@ func (_c *MockDataCoordClient_GetComponentStates_Call) RunAndReturn(run func(con return _c } +// GetDataViewVersions provides a mock function with given fields: ctx, in, opts +func (_m *MockDataCoordClient) GetDataViewVersions(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 *datapb.GetDataViewVersionsResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) *datapb.GetDataViewVersionsResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datapb.GetDataViewVersionsResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataCoordClient_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockDataCoordClient_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - ctx context.Context +// - in *datapb.GetDataViewVersionsRequest +// - opts ...grpc.CallOption +func (_e *MockDataCoordClient_Expecter) GetDataViewVersions(ctx interface{}, in interface{}, opts ...interface{}) *MockDataCoordClient_GetDataViewVersions_Call { + return &MockDataCoordClient_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) Run(run func(ctx context.Context, in *datapb.GetDataViewVersionsRequest, opts ...grpc.CallOption)) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*datapb.GetDataViewVersionsRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) Return(_a0 *datapb.GetDataViewVersionsResponse, _a1 error) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataCoordClient_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, *datapb.GetDataViewVersionsRequest, ...grpc.CallOption) (*datapb.GetDataViewVersionsResponse, error)) *MockDataCoordClient_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetFlushAllState provides a mock function with given fields: ctx, in, opts func (_m *MockDataCoordClient) GetFlushAllState(ctx context.Context, in *milvuspb.GetFlushAllStateRequest, opts ...grpc.CallOption) (*milvuspb.GetFlushAllStateResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ecab1cdcfb07c..2cc6cd2c6d8de 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -52,6 +52,7 @@ service DataCoord { rpc GetSegmentInfoChannel(GetSegmentInfoChannelRequest) returns (milvus.StringResponse){} rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){} + rpc GetDataViewVersions(GetDataViewVersionsRequest) returns (GetDataViewVersionsResponse){} rpc GetRecoveryInfo(GetRecoveryInfoRequest) returns (GetRecoveryInfoResponse){} rpc GetRecoveryInfoV2(GetRecoveryInfoRequestV2) returns (GetRecoveryInfoResponseV2){} rpc GetChannelRecoveryInfo(GetChannelRecoveryInfoRequest) returns (GetChannelRecoveryInfoResponse){} @@ -462,6 +463,16 @@ message Binlog { int64 memory_size = 7; } +message GetDataViewVersionsRequest { + common.MsgBase base = 1; + repeated int64 collectionIDs = 2; +} + +message GetDataViewVersionsResponse { + common.Status status = 1; + map data_view_versions = 2; +} + message GetRecoveryInfoResponse { common.Status status = 1; repeated VchannelInfo channels = 2; diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index e6ab9a104f821..61188cc30a7a9 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -49,6 +49,7 @@ type Broker interface { ListIndexes(ctx context.Context, collectionID UniqueID) ([]*indexpb.IndexInfo, error) GetSegmentInfo(ctx context.Context, segmentID ...UniqueID) ([]*datapb.SegmentInfo, error) GetIndexInfo(ctx context.Context, collectionID UniqueID, segmentIDs ...UniqueID) (map[int64][]*querypb.FieldIndexInfo, error) + GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) DescribeDatabase(ctx context.Context, dbName string) (*rootcoordpb.DescribeDatabaseResponse, error) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) @@ -231,6 +232,26 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection return recoveryInfo.Channels, recoveryInfo.Binlogs, nil } +// GetDataViewVersions retrieves the data view versions of the target collections. +func (broker *CoordinatorBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) + defer cancel() + log := log.Ctx(ctx).With(zap.Int("numCollection", len(collectionIDs))) + + req := &datapb.GetDataViewVersionsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo), + ), + CollectionIDs: collectionIDs, + } + resp, err := broker.dataCoord.GetDataViewVersions(ctx, req) + if err = merr.CheckRPCCall(resp, err); err != nil { + log.Warn("GetDataViewVersions failed", zap.Error(err)) + return nil, err + } + return resp.GetDataViewVersions(), nil +} + func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collectionID UniqueID, partitionIDs ...UniqueID) ([]*datapb.VchannelInfo, []*datapb.SegmentInfo, error) { ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() diff --git a/internal/querycoordv2/meta/mock_broker.go b/internal/querycoordv2/meta/mock_broker.go index 23b1b15f28e12..b510aff115502 100644 --- a/internal/querycoordv2/meta/mock_broker.go +++ b/internal/querycoordv2/meta/mock_broker.go @@ -214,6 +214,65 @@ func (_c *MockBroker_GetCollectionLoadInfo_Call) RunAndReturn(run func(context.C return _c } +// GetDataViewVersions provides a mock function with given fields: ctx, collectionIDs +func (_m *MockBroker) GetDataViewVersions(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + ret := _m.Called(ctx, collectionIDs) + + if len(ret) == 0 { + panic("no return value specified for GetDataViewVersions") + } + + var r0 map[int64]int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []int64) (map[int64]int64, error)); ok { + return rf(ctx, collectionIDs) + } + if rf, ok := ret.Get(0).(func(context.Context, []int64) map[int64]int64); ok { + r0 = rf(ctx, collectionIDs) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []int64) error); ok { + r1 = rf(ctx, collectionIDs) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockBroker_GetDataViewVersions_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDataViewVersions' +type MockBroker_GetDataViewVersions_Call struct { + *mock.Call +} + +// GetDataViewVersions is a helper method to define mock.On call +// - ctx context.Context +// - collectionIDs []int64 +func (_e *MockBroker_Expecter) GetDataViewVersions(ctx interface{}, collectionIDs interface{}) *MockBroker_GetDataViewVersions_Call { + return &MockBroker_GetDataViewVersions_Call{Call: _e.mock.On("GetDataViewVersions", ctx, collectionIDs)} +} + +func (_c *MockBroker_GetDataViewVersions_Call) Run(run func(ctx context.Context, collectionIDs []int64)) *MockBroker_GetDataViewVersions_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]int64)) + }) + return _c +} + +func (_c *MockBroker_GetDataViewVersions_Call) Return(_a0 map[int64]int64, _a1 error) *MockBroker_GetDataViewVersions_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockBroker_GetDataViewVersions_Call) RunAndReturn(run func(context.Context, []int64) (map[int64]int64, error)) *MockBroker_GetDataViewVersions_Call { + _c.Call.Return(run) + return _c +} + // GetIndexInfo provides a mock function with given fields: ctx, collectionID, segmentIDs func (_m *MockBroker) GetIndexInfo(ctx context.Context, collectionID int64, segmentIDs ...int64) (map[int64][]*querypb.FieldIndexInfo, error) { _va := make([]interface{}, len(segmentIDs)) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 45e6345488b45..d8c0bef1a00a7 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -40,6 +40,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +const InitialDataViewVersion = 0 + type targetOp int func (op *targetOp) String() string { @@ -93,6 +95,8 @@ type TargetObserver struct { // loadedDispatcher updates targets for loaded collections. loadedDispatcher *taskDispatcher[int64] + dataViewVersions *typeutil.ConcurrentMap[int64, int64] + keylocks *lock.KeyLock[int64] startOnce sync.Once @@ -118,6 +122,7 @@ func NewTargetObserver( updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), + dataViewVersions: typeutil.NewConcurrentMap[int64, int64](), keylocks: lock.NewKeyLock[int64](), } @@ -176,13 +181,8 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { - if collection.GetStatus() == querypb.LoadStatus_Loaded { - return collection.GetCollectionID(), true - } - return 0, false - }) - ob.loadedDispatcher.AddTask(loaded...) + versionUpdatedCollections := ob.GetVersionUpdatedCollections() + ob.loadedDispatcher.AddTask(versionUpdatedCollections...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -227,6 +227,41 @@ func (ob *TargetObserver) schedule(ctx context.Context) { } } +func (ob *TargetObserver) GetVersionUpdatedCollections() []int64 { + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + versions, err := ob.broker.GetDataViewVersions(context.Background(), loaded) + if err != nil { + log.Warn("GetDataViewVersions from dc failed", zap.Error(err)) + return nil + } + + var ( + staleCnt int + updatedCnt int + ) + + ret := make([]int64, 0) + for _, id := range loaded { + new := versions[id] + current, ok := ob.dataViewVersions.Get(id) + if !ok || new == InitialDataViewVersion || new > current { + ret = append(ret, id) + ob.dataViewVersions.GetOrInsert(id, new) + updatedCnt++ + continue + } + staleCnt++ + } + log.Info("get version updated collections done", zap.Int("totalCnt", len(loaded)), + zap.Int("staleCnt", staleCnt), zap.Int("updatedCnt", updatedCnt)) + return ret +} + // Check whether provided collection is has current target. // If not, submit an async task into dispatcher. func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { @@ -311,6 +346,14 @@ func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...in func (ob *TargetObserver) clean() { collectionSet := typeutil.NewUniqueSet(ob.meta.GetAll()...) + // for collection which has been dropped/released, clear data version cache + ob.dataViewVersions.Range(func(collectionID int64, _ int64) bool { + if !collectionSet.Contain(collectionID) { + ob.dataViewVersions.Remove(collectionID) + } + return true + }) + // for collection which has been removed from target, try to clear nextTargetLastUpdate ob.nextTargetLastUpdate.Range(func(collectionID int64, _ time.Time) bool { if !collectionSet.Contain(collectionID) { @@ -352,6 +395,8 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error { if err != nil { log.Warn("failed to update next target for collection", zap.Error(err)) + // update next target failed, remove data view version cache + ob.dataViewVersions.Remove(collectionID) return err } ob.updateNextTargetTimestamp(collectionID) diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index dcbd8ec5f247e..2058aece3f7fb 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -182,6 +182,14 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { suite.broker.EXPECT(). GetRecoveryInfoV2(mock.Anything, mock.Anything). Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) + suite.broker.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + versions := make(map[int64]int64) + for _, collectionID := range collectionIDs { + versions[collectionID] = InitialDataViewVersion + } + return versions, nil + }) suite.Eventually(func() bool { return len(suite.targetMgr.GetSealedSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 && diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index a573eea5c8de7..2dd15369e7bb0 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1095,6 +1095,8 @@ func (suite *ServiceSuite) TestRefreshCollection() { suite.ErrorIs(err, merr.ErrCollectionNotLoaded) } + suite.expectGetDataViewVersions() + // Test load all collections suite.loadAll() @@ -1935,6 +1937,17 @@ func (suite *ServiceSuite) assertSegments(collection int64, segments []*querypb. return true } +func (suite *ServiceSuite) expectGetDataViewVersions() { + suite.broker.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, collectionIDs []int64) (map[int64]int64, error) { + versions := make(map[int64]int64) + for _, collectionID := range collectionIDs { + versions[collectionID] = observers.InitialDataViewVersion + } + return versions, nil + }) +} + func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe() vChannels := []*datapb.VchannelInfo{} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 55bb1a1894d02..da60f7dbfefd2 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3318,6 +3318,9 @@ type dataCoordConfig struct { EnableStatsTask ParamItem `refreshable:"true"` TaskCheckInterval ParamItem `refreshable:"true"` + + // data view + DataViewUpdateInterval ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -4191,6 +4194,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: false, } p.TaskCheckInterval.Init(base.mgr) + + p.DataViewUpdateInterval = ParamItem{ + Key: "dataCoord.dataView.updateInterval", + Version: "2.5.0", + Doc: "The interval (in seconds) for trying to update the data view of all collections.", + DefaultValue: "10", + PanicIfEmpty: false, + Export: false, + } + p.DataViewUpdateInterval.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/typeutil/map.go b/pkg/util/typeutil/map.go index be4f5af5b74ab..35459194b8b8a 100644 --- a/pkg/util/typeutil/map.go +++ b/pkg/util/typeutil/map.go @@ -7,7 +7,7 @@ import ( ) // MapEqual returns true if the two map contain the same keys and values -func MapEqual(left, right map[int64]int64) bool { +func MapEqual[K, V comparable](left, right map[K]V) bool { if len(left) != len(right) { return false } From da09929abd8ee02c3d5b3905a40a395612e9efd4 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 21 Nov 2024 20:26:38 +0800 Subject: [PATCH 2/7] add more test Signed-off-by: bigsheeper --- internal/datacoord/dataview/data_view.go | 2 +- internal/datacoord/dataview/view_manager.go | 37 ++- .../datacoord/dataview/view_manager_test.go | 217 ++++++++++++++++++ internal/datacoord/handler.go | 3 + 4 files changed, 247 insertions(+), 12 deletions(-) create mode 100644 internal/datacoord/dataview/view_manager_test.go diff --git a/internal/datacoord/dataview/data_view.go b/internal/datacoord/dataview/data_view.go index c34b59b820373..bfea1c1ff8120 100644 --- a/internal/datacoord/dataview/data_view.go +++ b/internal/datacoord/dataview/data_view.go @@ -18,7 +18,7 @@ package dataview import "github.com/milvus-io/milvus/internal/proto/datapb" -const InitialDataViewVersion = 0 +const InitialDataViewVersion int64 = 0 type DataView struct { CollectionID int64 diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index 9eed50065bad2..f5638f4e277b1 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -33,6 +33,7 @@ type PullNewDataViewFunction func(collectionID int64) (*DataView, error) type ViewManager interface { Get(collectionID int64) (*DataView, error) GetVersion(collectionID int64) int64 + Remove(collectionID int64) Start() Close() @@ -63,8 +64,12 @@ func (m *dataViewManager) Get(collectionID int64) (*DataView, error) { if err != nil { return nil, err } - m.currentViews.GetOrInsert(collectionID, view) - return view, nil + + v, ok := m.currentViews.GetOrInsert(collectionID, view) + if !ok { + log.Info("update new data view", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version)) + } + return v, nil } func (m *dataViewManager) GetVersion(collectionID int64) int64 { @@ -74,6 +79,12 @@ func (m *dataViewManager) GetVersion(collectionID int64) int64 { return InitialDataViewVersion } +func (m *dataViewManager) Remove(collectionID int64) { + if view, ok := m.currentViews.GetAndRemove(collectionID); ok { + log.Info("data view removed", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version)) + } +} + func (m *dataViewManager) Start() { ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second)) defer ticker.Stop() @@ -100,35 +111,36 @@ func (m *dataViewManager) Close() { } func (m *dataViewManager) update(view *DataView) { - _, ok := m.currentViews.GetOrInsert(view.CollectionID, view) - if ok { - log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) - } + m.currentViews.Insert(view.CollectionID, view) + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) } func (m *dataViewManager) TryUpdateDataView(collectionID int64) { newView, err := m.pullFn(collectionID) if err != nil { log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - // notify to trigger pull again + // notify to trigger retry NotifyUpdate(collectionID) return } currentView, ok := m.currentViews.Get(collectionID) if !ok { - m.currentViews.GetOrInsert(collectionID, newView) + // update due to data view is empty + m.update(newView) return } // no-op if the incoming version is less than the current version. if newView.Version <= currentView.Version { + log.Warn("stale version, skip update", zap.Int64("collectionID", collectionID), + zap.Int64("new", newView.Version), zap.Int64("current", currentView.Version)) return } - // check if channel info has been updated. for channel, new := range newView.Channels { current, ok := currentView.Channels[channel] if !ok { + // update due to channel info is empty m.update(newView) return } @@ -137,22 +149,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { !funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) || !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { + // update due to segments list changed m.update(newView) return } if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { + // update due to partition stats changed m.update(newView) return } // TODO: It might be too frequent. if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + // update due to channel cp advanced m.update(newView) return } } - // check if segment info has been updated. if !typeutil.MapEqual(newView.Segments, currentView.Segments) { - m.currentViews.GetOrInsert(collectionID, newView) + // update due to segments list changed + m.update(newView) } } diff --git a/internal/datacoord/dataview/view_manager_test.go b/internal/datacoord/dataview/view_manager_test.go new file mode 100644 index 0000000000000..afd41a6b7bd5e --- /dev/null +++ b/internal/datacoord/dataview/view_manager_test.go @@ -0,0 +1,217 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func init() { + paramtable.Init() +} + +func TestNewDataViewManager_Get(t *testing.T) { + pullFn := func(collectionID int64) (*DataView, error) { + return &DataView{ + CollectionID: collectionID, + Channels: nil, + Segments: nil, + Version: time.Now().UnixNano(), + }, nil + } + manager := NewDataViewManager(pullFn) + + collectionID := int64(1) + // No data view + version := manager.GetVersion(collectionID) + assert.Equal(t, InitialDataViewVersion, version) + + // Lazy get data view + v1, err := manager.Get(collectionID) + assert.NoError(t, err) + assert.NotEqual(t, InitialDataViewVersion, v1) + version = manager.GetVersion(v1.CollectionID) + assert.Equal(t, v1.Version, version) + + // Get again, data view should not update + v2, err := manager.Get(collectionID) + assert.NoError(t, err) + assert.Equal(t, v1, v2) +} + +func TestNewDataViewManager_TryUpdateDataView(t *testing.T) { + manager := NewDataViewManager(nil) + go manager.Start() + defer manager.Close() + + collectionID := int64(1) + + // Update due to data view is empty + v1 := &DataView{ + CollectionID: collectionID, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v1, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v1.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to channel info is empty + v2 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v2, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v2.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to segments list changed + v3 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + UnflushedSegmentIds: []int64{100, 200}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v3, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v3.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to partition stats changed + v4 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: uint64(time.Now().UnixNano()), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v4, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v4.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to channel cp advanced + v5 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: uint64(time.Now().UnixNano()), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v5, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v5.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to segments list changed + v6 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: v5.Channels["ch0"].GetSeekPosition().GetTimestamp(), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Segments: map[int64]struct{}{ + 300: {}, + }, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v6, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v6.Version + }, 1*time.Second, 10*time.Millisecond) + + // Won't update anymore + NotifyUpdate(collectionID) + assert.Never(t, func() bool { + version := manager.GetVersion(collectionID) + return version != v6.Version + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestNewDataViewManager_TryUpdateDataView_Failed(t *testing.T) { + manager := NewDataViewManager(nil) + go manager.Start() + defer manager.Close() + + collectionID := int64(1) + + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return nil, fmt.Errorf("mock err") + } + NotifyUpdate(collectionID) + assert.Never(t, func() bool { + version := manager.GetVersion(collectionID) + return version > InitialDataViewVersion + }, 100*time.Millisecond, 10*time.Millisecond) +} diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 86693188cef84..f0b8974471bab 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -481,5 +481,8 @@ func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) er // clean collection info cache when meet drop collection info h.s.meta.DropCollection(collectionID) + // clean data view + h.s.viewManager.Remove(collectionID) + return nil } From 0d63554e6dbf6a0e0423510698541a2e22fe0b55 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 21 Nov 2024 20:49:55 +0800 Subject: [PATCH 3/7] add more test Signed-off-by: bigsheeper --- internal/datacoord/services_test.go | 34 +++++++++++++ .../datacoord/client/client_test.go | 51 +++++++++++++++++++ .../distributed/datacoord/service_test.go | 7 +++ 3 files changed, 92 insertions(+) diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 2e120012cdff9..e44c00bb933c9 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -21,6 +21,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" + "github.com/milvus-io/milvus/internal/datacoord/dataview" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" @@ -790,6 +791,39 @@ func TestServer_GcConfirm(t *testing.T) { }) } +func TestGetDataViewVersions(t *testing.T) { + t.Run("server not healthy", func(t *testing.T) { + svr := newTestServer(t) + closeTestServer(t, svr) + resp, err := svr.GetDataViewVersions(context.TODO(), &datapb.GetDataViewVersionsRequest{}) + assert.NoError(t, err) + err = merr.Error(resp.GetStatus()) + assert.ErrorIs(t, err, merr.ErrServiceNotReady) + }) + + t.Run("normal", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + pullFn := func(collectionID int64) (*dataview.DataView, error) { + return &dataview.DataView{ + CollectionID: collectionID, + Version: time.Now().UnixNano(), + }, nil + } + manager := dataview.NewDataViewManager(pullFn) + svr.viewManager = manager + + req := &datapb.GetDataViewVersionsRequest{ + CollectionIDs: []int64{100, 200, 300}, + } + resp, err := svr.GetDataViewVersions(context.TODO(), req) + assert.NoError(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetCode()) + assert.EqualValues(t, 3, len(resp.GetDataViewVersions())) + }) +} + func TestGetRecoveryInfoV2(t *testing.T) { t.Run("test get recovery info with no segments", func(t *testing.T) { svr := newTestServer(t) diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index c46dab7235126..125e9449fade1 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -692,6 +692,57 @@ func Test_SaveBinlogPaths(t *testing.T) { assert.ErrorIs(t, err, context.DeadlineExceeded) } +func Test_GetDataViewVersions(t *testing.T) { + paramtable.Init() + + ctx := context.Background() + client, err := NewClient(ctx) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + + mockDC := mocks.NewMockDataCoordClient(t) + mockGrpcClient := mocks.NewMockGrpcClient[datapb.DataCoordClient](t) + mockGrpcClient.EXPECT().Close().Return(nil) + mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(datapb.DataCoordClient) (interface{}, error)) (interface{}, error) { + return f(mockDC) + }) + client.(*Client).grpcClient = mockGrpcClient + + // test success + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + }, nil) + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.Nil(t, err) + + // test return error status + mockDC.ExpectedCalls = nil + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Status(merr.ErrServiceNotReady), + }, nil) + + rsp, err := client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode()) + assert.Nil(t, err) + + // test return error + mockDC.ExpectedCalls = nil + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + }, mockErr) + + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.NotNil(t, err) + + // test ctx done + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + time.Sleep(20 * time.Millisecond) + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + func Test_GetRecoveryInfo(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/datacoord/service_test.go b/internal/distributed/datacoord/service_test.go index fcad74a636b0f..b329760750c61 100644 --- a/internal/distributed/datacoord/service_test.go +++ b/internal/distributed/datacoord/service_test.go @@ -130,6 +130,13 @@ func Test_NewServer(t *testing.T) { assert.NotNil(t, resp) }) + t.Run("GetDataViewVersions", func(t *testing.T) { + mockDataCoord.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{}, nil) + resp, err := server.GetDataViewVersions(ctx, nil) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + t.Run("GetRecoveryInfo", func(t *testing.T) { mockDataCoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponse{}, nil) resp, err := server.GetRecoveryInfo(ctx, nil) From d0d096bb3dca28f13c9aa79dbf6b4bde34cfb57f Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 26 Nov 2024 15:12:07 +0800 Subject: [PATCH 4/7] add info Signed-off-by: bigsheeper --- internal/datacoord/dataview/view_manager.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index f5638f4e277b1..1c279c6edd123 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -25,6 +25,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -110,9 +111,9 @@ func (m *dataViewManager) Close() { }) } -func (m *dataViewManager) update(view *DataView) { +func (m *dataViewManager) update(view *DataView, reason string) { m.currentViews.Insert(view.CollectionID, view) - log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version), zap.String("reason", reason)) } func (m *dataViewManager) TryUpdateDataView(collectionID int64) { @@ -127,7 +128,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { currentView, ok := m.currentViews.Get(collectionID) if !ok { // update due to data view is empty - m.update(newView) + m.update(newView, "init data view") return } // no-op if the incoming version is less than the current version. @@ -141,7 +142,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { current, ok := currentView.Channels[channel] if !ok { // update due to channel info is empty - m.update(newView) + m.update(newView, "init channel info") return } if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) || @@ -150,24 +151,26 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { // update due to segments list changed - m.update(newView) + m.update(newView, "channel segments list changed") return } if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { // update due to partition stats changed - m.update(newView) + m.update(newView, "partition stats changed") return } // TODO: It might be too frequent. - if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp()) + curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp()) + if newTime.Sub(curTime) > time.Second*600 { // update due to channel cp advanced - m.update(newView) + m.update(newView, "channel cp advanced") return } } if !typeutil.MapEqual(newView.Segments, currentView.Segments) { // update due to segments list changed - m.update(newView) + m.update(newView, "segment list changed") } } From 5d29848e8ee2ab33a517f9477fd49efd6a671264 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 26 Nov 2024 15:55:31 +0800 Subject: [PATCH 5/7] make configuarable Signed-off-by: bigsheeper --- internal/datacoord/dataview/view_manager.go | 3 +-- pkg/util/paramtable/component_param.go | 13 ++++++++++++- pkg/util/paramtable/component_param_test.go | 2 ++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index 1c279c6edd123..899423f5bd2cd 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -159,10 +159,9 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { m.update(newView, "partition stats changed") return } - // TODO: It might be too frequent. newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp()) curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp()) - if newTime.Sub(curTime) > time.Second*600 { + if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) { // update due to channel cp advanced m.update(newView, "channel cp advanced") return diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c150dcc6dfaa9..76ec711f5525e 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3320,7 +3320,8 @@ type dataCoordConfig struct { TaskCheckInterval ParamItem `refreshable:"true"` // data view - DataViewUpdateInterval ParamItem `refreshable:"true"` + DataViewUpdateInterval ParamItem `refreshable:"true"` + CPIntervalToUpdateDataView ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -4204,6 +4205,16 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: false, } p.DataViewUpdateInterval.Init(base.mgr) + + p.CPIntervalToUpdateDataView = ParamItem{ + Key: "dataCoord.dataView.cpInterval", + Version: "2.5.0", + Doc: "cpInterval is a time interval in seconds. If the time interval between the new channel checkpoint and the current channel checkpoint exceeds cpInterval, it will trigger a data view update.", + DefaultValue: "600", + PanicIfEmpty: false, + Export: false, + } + p.CPIntervalToUpdateDataView.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 529ec7a2d969b..145c24e1d3142 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -516,6 +516,8 @@ func TestComponentParam(t *testing.T) { params.Save("datacoord.scheduler.taskSlowThreshold", "1000") assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second)) assert.Equal(t, 32, Params.MaxConcurrentChannelTaskNumPerDN.GetAsInt()) + assert.Equal(t, 10*time.Second, Params.DataViewUpdateInterval.GetAsDuration(time.Second)) + assert.Equal(t, 600*time.Second, Params.CPIntervalToUpdateDataView.GetAsDuration(time.Second)) }) t.Run("test dataNodeConfig", func(t *testing.T) { From ce6466c125ef70bdcaec5a7f7dd81702a2fb83d6 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 29 Nov 2024 17:39:44 +0800 Subject: [PATCH 6/7] fix conflicts Signed-off-by: bigsheeper --- internal/datacoord/services.go | 2 +- internal/querycoordv2/observers/target_observer.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 15e227a7a955c..4de26ba473203 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -971,7 +971,7 @@ func (s *Server) pullNewDataView(collectionID int64) (*dataview.DataView, error) segments := make([]int64, 0) for id := range flushedIDs { - segment := s.meta.GetSegment(id) + segment := s.meta.GetSegment(context.TODO(), id) if segment == nil { err := merr.WrapErrSegmentNotFound(id) log.Warn("failed to get segment", zap.Int64("segmentID", id)) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index fe86f4803de88..11bf131533b2c 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -228,7 +228,7 @@ func (ob *TargetObserver) schedule(ctx context.Context) { } func (ob *TargetObserver) GetVersionUpdatedCollections() []int64 { - loaded := lo.FilterMap(ob.meta.GetAllCollections(ctx), func(collection *meta.Collection, _ int) (int64, bool) { + loaded := lo.FilterMap(ob.meta.GetAllCollections(context.TODO()), func(collection *meta.Collection, _ int) (int64, bool) { if collection.GetStatus() == querypb.LoadStatus_Loaded { return collection.GetCollectionID(), true } From c010808f4213fa97fd512e6526201f1cf9376f56 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 4 Dec 2024 19:43:40 +0800 Subject: [PATCH 7/7] fix ut Signed-off-by: bigsheeper --- internal/datacoord/dataview/view_manager.go | 3 ++- internal/datacoord/dataview/view_manager_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index 899423f5bd2cd..f7cc17fa6ad39 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -17,6 +17,7 @@ package dataview import ( + "fmt" "sync" "time" @@ -163,7 +164,7 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp()) if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) { // update due to channel cp advanced - m.update(newView, "channel cp advanced") + m.update(newView, fmt.Sprintf("channel cp advanced, curTime=%v, newTime=%v", curTime, newTime)) return } } diff --git a/internal/datacoord/dataview/view_manager_test.go b/internal/datacoord/dataview/view_manager_test.go index afd41a6b7bd5e..e2946472bcb5e 100644 --- a/internal/datacoord/dataview/view_manager_test.go +++ b/internal/datacoord/dataview/view_manager_test.go @@ -149,7 +149,7 @@ func TestNewDataViewManager_TryUpdateDataView(t *testing.T) { CollectionID: collectionID, ChannelName: "ch0", SeekPosition: &msgpb.MsgPosition{ - Timestamp: uint64(time.Now().UnixNano()), + Timestamp: uint64(time.Now().Add(paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second)).UnixNano()), }, UnflushedSegmentIds: []int64{100, 200}, PartitionStatsVersions: map[int64]int64{1000: 2000},