From 4ba0ed31788f33d658e71a6aa13e3db610eef40b Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Fri, 27 Dec 2024 15:08:50 +0800 Subject: [PATCH] enhance: speed up streaming barrier timetick (#38787) issue: #38399 - use last allocate but not last confirmed id to make barrier. - move the barrier logic into the timetick allocator. - try to sync up local allocator and remote allocator when first barrier check not pass to speed up. Signed-off-by: chyezh --- .../server/resource/idalloc/allocator.go | 82 ++++++++++++++++--- .../server/resource/idalloc/allocator_test.go | 49 +++++++++++ .../resource/idalloc/basic_allocator.go | 5 +- .../wal/interceptors/timetick/ack/manager.go | 31 +++---- 4 files changed, 135 insertions(+), 32 deletions(-) diff --git a/internal/streamingnode/server/resource/idalloc/allocator.go b/internal/streamingnode/server/resource/idalloc/allocator.go index f614d6f5ec3d6..1a296891f656d 100644 --- a/internal/streamingnode/server/resource/idalloc/allocator.go +++ b/internal/streamingnode/server/resource/idalloc/allocator.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/syncutil" ) @@ -33,7 +35,7 @@ var _ Allocator = (*allocatorImpl)(nil) // NewTSOAllocator creates a new allocator. func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator { return &allocatorImpl{ - mu: sync.Mutex{}, + cond: syncutil.NewContextCond(&sync.Mutex{}), remoteAllocator: newTSOAllocator(rc), localAllocator: newLocalAllocator(), } @@ -42,7 +44,7 @@ func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator { // NewIDAllocator creates a new allocator. func NewIDAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator { return &allocatorImpl{ - mu: sync.Mutex{}, + cond: syncutil.NewContextCond(&sync.Mutex{}), remoteAllocator: newIDAllocator(rc), localAllocator: newLocalAllocator(), } @@ -56,6 +58,9 @@ type Allocator interface { // Allocate allocates a timestamp. Allocate(ctx context.Context) (uint64, error) + // BarrierUtil make a barrier, next allocate call will generate id greater than barrier. + BarrierUntil(ctx context.Context, barrier uint64) error + // Sync expire the local allocator messages, // syncs the local allocator and remote allocator. Sync() @@ -65,37 +70,92 @@ type Allocator interface { } type allocatorImpl struct { - mu sync.Mutex + cond *syncutil.ContextCond remoteAllocator remoteBatchAllocator lastSyncTime time.Time + lastAllocated uint64 localAllocator *localAllocator } -// AllocateOne allocates a timestamp. func (ta *allocatorImpl) Allocate(ctx context.Context) (uint64, error) { - ta.mu.Lock() - defer ta.mu.Unlock() + ta.cond.L.Lock() + defer ta.cond.L.Unlock() + + return ta.allocateOne(ctx) +} + +func (ta *allocatorImpl) BarrierUntil(ctx context.Context, barrier uint64) error { + err := ta.barrierFastPath(ctx, barrier) + if err == nil { + return nil + } + if !errors.Is(err, errFastPathFailed) { + return err + } + + // Fall back to the slow path to avoid block other id allocation opeartions. + ta.cond.L.Lock() + for ta.lastAllocated < barrier { + if err := ta.cond.Wait(ctx); err != nil { + return err + } + } + ta.cond.L.Unlock() + return nil +} + +func (ta *allocatorImpl) barrierFastPath(ctx context.Context, barrier uint64) error { + ta.cond.L.Lock() + defer ta.cond.L.Unlock() + + for i := 0; i < 2; i++ { + id, err := ta.allocateOne(ctx) + if err != nil { + return err + } + + // check if the allocated id is greater than barrier. + if id >= barrier { + return nil + } + if i == 0 { + // force to syncup the local allocator and remote allocator at first time. + // It's the fast path if the barrier is allocated from same remote allocator. + ta.localAllocator.exhausted() + } + } + return errFastPathFailed +} +func (ta *allocatorImpl) allocateOne(ctx context.Context) (uint64, error) { // allocate one from local allocator first. if id, err := ta.localAllocator.allocateOne(); err == nil { + ta.lastAllocated = id + ta.cond.UnsafeBroadcast() return id, nil } // allocate from remote. - return ta.allocateRemote(ctx) + id, err := ta.allocateRemote(ctx) + if err != nil { + return 0, err + } + ta.lastAllocated = id + ta.cond.UnsafeBroadcast() + return id, nil } // Sync expire the local allocator messages, // syncs the local allocator and remote allocator. func (ta *allocatorImpl) Sync() { - ta.mu.Lock() - defer ta.mu.Unlock() + ta.cond.L.Lock() + defer ta.cond.L.Unlock() ta.localAllocator.exhausted() } func (ta *allocatorImpl) SyncIfExpired(expire time.Duration) { - ta.mu.Lock() - defer ta.mu.Unlock() + ta.cond.L.Lock() + defer ta.cond.L.Unlock() if time.Since(ta.lastSyncTime) > expire { ta.localAllocator.exhausted() diff --git a/internal/streamingnode/server/resource/idalloc/allocator_test.go b/internal/streamingnode/server/resource/idalloc/allocator_test.go index 26eb9e90c2b1a..305c917c5be74 100644 --- a/internal/streamingnode/server/resource/idalloc/allocator_test.go +++ b/internal/streamingnode/server/resource/idalloc/allocator_test.go @@ -58,3 +58,52 @@ func TestTimestampAllocator(t *testing.T) { _, err := allocator.Allocate(context.Background()) assert.Error(t, err) } + +func TestIDAllocator(t *testing.T) { + paramtable.Init() + paramtable.SetNodeID(1) + + client := NewMockRootCoordClient(t) + f := syncutil.NewFuture[types.RootCoordClient]() + f.Set(client) + + allocator := NewIDAllocator(f) + + // Make local dirty + allocator.Allocate(context.Background()) + // Test barrier fast path. + resp, err := client.AllocID(context.Background(), &rootcoordpb.AllocIDRequest{ + Count: 100, + }) + assert.NoError(t, err) + err = allocator.BarrierUntil(context.Background(), uint64(resp.ID)) + assert.NoError(t, err) + newBarrierTimeTick, err := allocator.Allocate(context.Background()) + assert.NoError(t, err) + assert.Greater(t, newBarrierTimeTick, uint64(resp.ID)) + + // Test slow path. + ch := make(chan struct{}) + go func() { + barrier := newBarrierTimeTick + 1*batchAllocateSize + err := allocator.BarrierUntil(context.Background(), barrier) + assert.NoError(t, err) + newBarrierTimeTick, err := allocator.Allocate(context.Background()) + assert.NoError(t, err) + assert.Greater(t, newBarrierTimeTick, barrier) + close(ch) + }() + select { + case <-ch: + assert.Fail(t, "should not finish") + case <-time.After(time.Millisecond * 20): + } + allocator.Sync() + _, err = allocator.Allocate(context.Background()) + assert.NoError(t, err) + <-ch + + allocator.SyncIfExpired(time.Millisecond * 50) + time.Sleep(time.Millisecond * 10) + allocator.SyncIfExpired(time.Millisecond * 10) +} diff --git a/internal/streamingnode/server/resource/idalloc/basic_allocator.go b/internal/streamingnode/server/resource/idalloc/basic_allocator.go index 8b9e220cc410a..ad49c5192a03b 100644 --- a/internal/streamingnode/server/resource/idalloc/basic_allocator.go +++ b/internal/streamingnode/server/resource/idalloc/basic_allocator.go @@ -15,7 +15,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/syncutil" ) -var errExhausted = errors.New("exhausted") +var ( + errExhausted = errors.New("exhausted") + errFastPathFailed = errors.New("fast path failed") +) // newLocalAllocator creates a new local allocator. func newLocalAllocator() *localAllocator { diff --git a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go index 362c90cb526a6..7b93365cbfcfe 100644 --- a/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go +++ b/internal/streamingnode/server/wal/interceptors/timetick/ack/manager.go @@ -8,13 +8,12 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/server/resource" "github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil" "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/milvus-io/milvus/pkg/util/syncutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) // AckManager manages the timestampAck. type AckManager struct { - cond *syncutil.ContextCond + mu sync.Mutex lastAllocatedTimeTick uint64 // The last allocated time tick, the latest timestamp allocated by the allocator. lastConfirmedTimeTick uint64 // The last confirmed time tick, the message which time tick less than lastConfirmedTimeTick has been committed into wal. notAckHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum allocated but not ack timestamp in list. @@ -34,7 +33,7 @@ func NewAckManager( metrics *metricsutil.TimeTickMetrics, ) *AckManager { return &AckManager{ - cond: syncutil.NewContextCond(&sync.Mutex{}), + mu: sync.Mutex{}, lastAllocatedTimeTick: 0, notAckHeap: typeutil.NewHeap[*Acker](&ackersOrderByTimestamp{}), ackHeap: typeutil.NewHeap[*Acker](&ackersOrderByEndTimestamp{}), @@ -46,23 +45,18 @@ func NewAckManager( // AllocateWithBarrier allocates a timestamp with a barrier. func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick uint64) (*Acker, error) { - // wait until the lastConfirmedTimeTick is greater than barrierTimeTick. - ta.cond.L.Lock() - for ta.lastConfirmedTimeTick <= barrierTimeTick { - if err := ta.cond.Wait(ctx); err != nil { - return nil, err - } + // Just make a barrier to the underlying allocator. + if err := resource.Resource().TSOAllocator().BarrierUntil(ctx, barrierTimeTick); err != nil { + return nil, err } - ta.cond.L.Unlock() - return ta.Allocate(ctx) } // Allocate allocates a timestamp. // Concurrent safe to call with Sync and Allocate. func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) { - ta.cond.L.Lock() - defer ta.cond.L.Unlock() + ta.mu.Lock() + defer ta.mu.Unlock() // allocate one from underlying allocator first. ts, err := resource.Resource().TSOAllocator().Allocate(ctx) @@ -97,8 +91,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail, } tsWithAck.Ack(OptSync()) - ta.cond.L.Lock() - defer ta.cond.L.Unlock() + ta.mu.Lock() + defer ta.mu.Unlock() details := ta.acknowledgedDetails ta.acknowledgedDetails = make(sortedDetails, 0, 5) @@ -107,8 +101,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail, // ack marks the timestamp as acknowledged. func (ta *AckManager) ack(acker *Acker) { - ta.cond.L.Lock() - defer ta.cond.L.Unlock() + ta.mu.Lock() + defer ta.mu.Unlock() acker.acknowledged = true acker.detail.EndTimestamp = ta.lastAllocatedTimeTick @@ -129,9 +123,6 @@ func (ta *AckManager) popUntilLastAllAcknowledged() { return } - // broadcast to notify the last confirmed timetick updated. - ta.cond.UnsafeBroadcast() - // update last confirmed time tick. ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp ta.metrics.UpdateLastConfirmedTimeTick(ta.lastConfirmedTimeTick)