Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Reduce GetRecoveryInfo calls #37863

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1960dec
enhance: Reduce GetRecoveryInfo calls from querycoord
bigsheeper Nov 13, 2024
e18a8fc
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Nov 21, 2024
da09929
add more test
bigsheeper Nov 21, 2024
ec50719
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Nov 21, 2024
0d63554
add more test
bigsheeper Nov 21, 2024
5689fc7
fix conflicts
bigsheeper Nov 26, 2024
185ccf5
Merge branch 'master' of github.com:milvus-io/milvus into 2411-dc-ver…
bigsheeper Nov 26, 2024
d0d096b
add info
bigsheeper Nov 26, 2024
de50974
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Nov 26, 2024
5e83c57
Merge branch '2411-dc-version' of github.com:bigsheeper/milvus into 2…
bigsheeper Nov 26, 2024
5d29848
make configuarable
bigsheeper Nov 26, 2024
207f47f
fix conflicts
bigsheeper Nov 28, 2024
0351f94
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Nov 29, 2024
ce6466c
fix conflicts
bigsheeper Nov 29, 2024
7ba3dda
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 3, 2024
c194485
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 4, 2024
c010808
fix ut
bigsheeper Dec 4, 2024
e8dc8ee
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 9, 2024
f226d77
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 12, 2024
4770d1b
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 13, 2024
0147eb6
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
bigsheeper Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions internal/datacoord/dataview/data_view.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataview

import "github.com/milvus-io/milvus/internal/proto/datapb"

const InitialDataViewVersion int64 = 0

type DataView struct {
CollectionID int64
Channels map[string]*datapb.VchannelInfo
Segments map[int64]struct{}
Version int64
}
33 changes: 33 additions & 0 deletions internal/datacoord/dataview/update_chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataview

import "sync"

var updateChan chan int64
var initOnce sync.Once

func initUpdateChan() {
initOnce.Do(func() {
updateChan = make(chan int64, 1024)
})
}

// NotifyUpdate used to trigger updating data view immediately.
func NotifyUpdate(collectionID int64) {
updateChan <- collectionID
}
176 changes: 176 additions & 0 deletions internal/datacoord/dataview/view_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataview

import (
"fmt"
"sync"
"time"

"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type PullNewDataViewFunction func(collectionID int64) (*DataView, error)

type ViewManager interface {
Get(collectionID int64) (*DataView, error)
GetVersion(collectionID int64) int64
Remove(collectionID int64)

Start()
Close()
}

type dataViewManager struct {
pullFn PullNewDataViewFunction
currentViews *typeutil.ConcurrentMap[int64, *DataView]

closeOnce sync.Once
closeChan chan struct{}
}

func NewDataViewManager(pullFn PullNewDataViewFunction) ViewManager {
initUpdateChan()
return &dataViewManager{
pullFn: pullFn,
currentViews: typeutil.NewConcurrentMap[int64, *DataView](),
closeChan: make(chan struct{}),
}
}

func (m *dataViewManager) Get(collectionID int64) (*DataView, error) {
if view, ok := m.currentViews.Get(collectionID); ok {
return view, nil
}
view, err := m.pullFn(collectionID)
if err != nil {
return nil, err
}

Check warning on line 68 in internal/datacoord/dataview/view_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/dataview/view_manager.go#L67-L68

Added lines #L67 - L68 were not covered by tests

v, ok := m.currentViews.GetOrInsert(collectionID, view)
if !ok {
log.Info("update new data view", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version))
}
return v, nil
}

func (m *dataViewManager) GetVersion(collectionID int64) int64 {
if view, ok := m.currentViews.Get(collectionID); ok {
return view.Version
}
return InitialDataViewVersion
}

func (m *dataViewManager) Remove(collectionID int64) {
if view, ok := m.currentViews.GetAndRemove(collectionID); ok {
log.Info("data view removed", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version))
}

Check warning on line 87 in internal/datacoord/dataview/view_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/dataview/view_manager.go#L84-L87

Added lines #L84 - L87 were not covered by tests
}

func (m *dataViewManager) Start() {
ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-m.closeChan:
log.Info("data view manager exited")
return
case <-ticker.C:
// periodically update all data view
for _, collectionID := range m.currentViews.Keys() {
m.TryUpdateDataView(collectionID)
}
case collectionID := <-updateChan:
m.TryUpdateDataView(collectionID)
}
}
}

func (m *dataViewManager) Close() {
m.closeOnce.Do(func() {
close(m.closeChan)
})
}

func (m *dataViewManager) update(view *DataView, reason string) {
m.currentViews.Insert(view.CollectionID, view)
log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version), zap.String("reason", reason))
}

func (m *dataViewManager) TryUpdateDataView(collectionID int64) {
newView, err := m.pullFn(collectionID)
if err != nil {
log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err))
// notify to trigger retry
NotifyUpdate(collectionID)
return
}

currentView, ok := m.currentViews.Get(collectionID)
if !ok {
// update due to data view is empty
m.update(newView, "init data view")
return
}
// no-op if the incoming version is less than the current version.
if newView.Version <= currentView.Version {
log.Warn("stale version, skip update", zap.Int64("collectionID", collectionID),
zap.Int64("new", newView.Version), zap.Int64("current", currentView.Version))
return
}

for channel, new := range newView.Channels {
current, ok := currentView.Channels[channel]
if !ok {
// update due to channel info is empty
m.update(newView, "init channel info")
return
}
if !funcutil.SliceSetEqual(new.GetLevelZeroSegmentIds(), current.GetLevelZeroSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetUnflushedSegmentIds(), current.GetUnflushedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) ||
!funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) {
// update due to segments list changed
m.update(newView, "channel segments list changed")
return
}
if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) {
// update due to partition stats changed
m.update(newView, "partition stats changed")
return
}
newTime := tsoutil.PhysicalTime(new.GetSeekPosition().GetTimestamp())
curTime := tsoutil.PhysicalTime(current.GetSeekPosition().GetTimestamp())
if newTime.Sub(curTime) > paramtable.Get().DataCoordCfg.CPIntervalToUpdateDataView.GetAsDuration(time.Second) {
// update due to channel cp advanced
m.update(newView, fmt.Sprintf("channel cp advanced, curTime=%v, newTime=%v", curTime, newTime))
return
}
}

if !typeutil.MapEqual(newView.Segments, currentView.Segments) {
// update due to segments list changed
m.update(newView, "segment list changed")
}
}
Loading
Loading