Skip to content

Commit

Permalink
enhance: [10kcp] Simplify querynode tsafe & reduce goroutine number (#…
Browse files Browse the repository at this point in the history
…38416) (#38433)

Related to #37630

TSafe manager is too complex for current implementation and each
delegator need one goroutine waiting for tsafe update event.

Tsafe updating could be executed in pipeline. This PR remove tsafe
manager and simplify the entire logic of tsafe updating.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 13, 2024
1 parent de78de7 commit 28841eb
Show file tree
Hide file tree
Showing 21 changed files with 182 additions and 571 deletions.
45 changes: 10 additions & 35 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/optimizers"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -87,6 +86,10 @@ type ShardDelegator interface {
VerifyExcludedSegments(segmentID int64, ts uint64) bool
TryCleanExcludedSegments(ts uint64)

// tsafe
UpdateTSafe(ts uint64)
GetTSafe() uint64

// control
Serviceable() bool
Start()
Expand All @@ -111,7 +114,6 @@ type shardDelegator struct {

distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
// stream delete buffer
Expand Down Expand Up @@ -751,43 +753,20 @@ func (sd *shardDelegator) waitTSafe(ctx context.Context, ts uint64) (uint64, err
}
}

// watchTSafe is the worker function to update serviceable timestamp.
func (sd *shardDelegator) watchTSafe() {
defer sd.lifetime.Done()
listener := sd.tsafeManager.WatchChannel(sd.vchannelName)
sd.updateTSafe()
log := sd.getLogger(context.Background())
for {
select {
case _, ok := <-listener.On():
if !ok {
// listener close
log.Warn("tsafe listener closed")
return
}
sd.updateTSafe()
case <-sd.lifetime.CloseCh():
log.Info("updateTSafe quit")
// shard delegator closed
return
}
}
}

// updateTSafe read current tsafe value from tsafeManager.
func (sd *shardDelegator) updateTSafe() {
func (sd *shardDelegator) UpdateTSafe(tsafe uint64) {
sd.tsCond.L.Lock()
tsafe, err := sd.tsafeManager.Get(sd.vchannelName)
if err != nil {
log.Warn("tsafeManager failed to get lastest", zap.Error(err))
}
if tsafe > sd.latestTsafe.Load() {
sd.latestTsafe.Store(tsafe)
sd.tsCond.Broadcast()
}
sd.tsCond.L.Unlock()
}

func (sd *shardDelegator) GetTSafe() uint64 {
return sd.latestTsafe.Load()
}

// Close closes the delegator.
func (sd *shardDelegator) Close() {
sd.lifetime.SetState(lifetime.Stopped)
Expand Down Expand Up @@ -849,7 +828,7 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi

// NewShardDelegator creates a new ShardDelegator instance with all fields initialized.
func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID UniqueID, channel string, version int64,
workerManager cluster.Manager, manager *segments.Manager, tsafeManager tsafe.Manager, loader segments.Loader,
workerManager cluster.Manager, manager *segments.Manager, loader segments.Loader,
factory msgstream.Factory, startTs uint64, queryHook optimizers.QueryHook, chunkManager storage.ChunkManager,
) (ShardDelegator, error) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID),
Expand Down Expand Up @@ -885,7 +864,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock,
[]string{fmt.Sprint(paramtable.GetNodeID()), channel}),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
latestTsafe: atomic.NewUint64(startTs),
loader: loader,
factory: factory,
Expand All @@ -898,9 +876,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
}
m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go sd.watchTSafe()
}
log.Info("finish build new shardDelegator")
return sd, nil
}
6 changes: 1 addition & 5 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/internal/util/initcore"
Expand All @@ -62,7 +61,6 @@ type DelegatorDataSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
channel metautil.Channel
Expand Down Expand Up @@ -98,7 +96,6 @@ func (s *DelegatorDataSuite) TearDownSuite() {
func (s *DelegatorDataSuite) SetupTest() {
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}

// init schema
Expand Down Expand Up @@ -159,7 +156,7 @@ func (s *DelegatorDataSuite) SetupTest() {
s.rootPath = s.Suite.T().Name()
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
Expand Down Expand Up @@ -654,7 +651,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
s.version,
s.workerManager,
s.manager,
s.tsafeManager,
s.loader,
&msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
Expand Down
79 changes: 1 addition & 78 deletions internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ package delegator
import (
"context"
"io"
"sync"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand All @@ -39,13 +35,11 @@ import (
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand All @@ -61,7 +55,6 @@ type DelegatorSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream

Expand All @@ -85,7 +78,6 @@ func (s *DelegatorSuite) SetupTest() {
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
Expand Down Expand Up @@ -165,7 +157,7 @@ func (s *DelegatorSuite) SetupTest() {

var err error
// s.delegator, err = NewShardDelegator(s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader)
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
s.delegator, err = NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
Expand Down Expand Up @@ -1120,72 +1112,3 @@ func (s *DelegatorSuite) TestGetStats() {
func TestDelegatorSuite(t *testing.T) {
suite.Run(t, new(DelegatorSuite))
}

func TestDelegatorWatchTsafe(t *testing.T) {
channelName := "default_dml_channel"

tsafeManager := tsafe.NewTSafeReplica()
tsafeManager.Add(context.Background(), channelName, 100)
sd := &shardDelegator{
tsafeManager: tsafeManager,
vchannelName: channelName,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
latestTsafe: atomic.NewUint64(0),
}
defer sd.Close()

m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go sd.watchTSafe()
}

err := tsafeManager.Set(channelName, 200)
require.NoError(t, err)

assert.Eventually(t, func() bool {
return sd.latestTsafe.Load() == 200
}, time.Second*10, time.Millisecond*10)
}

func TestDelegatorTSafeListenerClosed(t *testing.T) {
channelName := "default_dml_channel"

tsafeManager := tsafe.NewTSafeReplica()
tsafeManager.Add(context.Background(), channelName, 100)
sd := &shardDelegator{
tsafeManager: tsafeManager,
vchannelName: channelName,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
latestTsafe: atomic.NewUint64(0),
}
defer sd.Close()

m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
signal := make(chan struct{})
if sd.lifetime.Add(lifetime.NotStopped) == nil {
go func() {
sd.watchTSafe()
close(signal)
}()
}

select {
case <-signal:
assert.FailNow(t, "watchTsafe quit unexpectedly")
case <-time.After(time.Millisecond * 10):
}

tsafeManager.Remove(context.Background(), channelName)

select {
case <-signal:
case <-time.After(time.Second):
assert.FailNow(t, "watchTsafe still working after listener closed")
}

sd.Close()
assert.Equal(t, sd.Serviceable(), false)
assert.Equal(t, sd.Stopped(), true)
}
5 changes: 1 addition & 4 deletions internal/querynodev2/delegator/delta_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand All @@ -51,7 +50,6 @@ type StreamingForwardSuite struct {
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream

Expand All @@ -73,7 +71,6 @@ func (s *StreamingForwardSuite) SetupTest() {
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
Expand Down Expand Up @@ -151,7 +148,7 @@ func (s *StreamingForwardSuite) SetupTest() {
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())

delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
Expand Down
Loading

0 comments on commit 28841eb

Please sign in to comment.