Skip to content

Commit

Permalink
enhance: speed up streaming barrier timetick (milvus-io#38787)
Browse files Browse the repository at this point in the history
issue: milvus-io#38399

- use last allocate but not last confirmed id to make barrier. 
- move the barrier logic into the timetick allocator.
- try to sync up local allocator and remote allocator when first barrier
check not pass to speed up.

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 27, 2024
1 parent a882f34 commit 4ba0ed3
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 32 deletions.
82 changes: 71 additions & 11 deletions internal/streamingnode/server/resource/idalloc/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/cockroachdb/errors"

"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)
Expand All @@ -33,7 +35,7 @@ var _ Allocator = (*allocatorImpl)(nil)
// NewTSOAllocator creates a new allocator.
func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
return &allocatorImpl{
mu: sync.Mutex{},
cond: syncutil.NewContextCond(&sync.Mutex{}),
remoteAllocator: newTSOAllocator(rc),
localAllocator: newLocalAllocator(),
}
Expand All @@ -42,7 +44,7 @@ func NewTSOAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
// NewIDAllocator creates a new allocator.
func NewIDAllocator(rc *syncutil.Future[types.RootCoordClient]) Allocator {
return &allocatorImpl{
mu: sync.Mutex{},
cond: syncutil.NewContextCond(&sync.Mutex{}),
remoteAllocator: newIDAllocator(rc),
localAllocator: newLocalAllocator(),
}
Expand All @@ -56,6 +58,9 @@ type Allocator interface {
// Allocate allocates a timestamp.
Allocate(ctx context.Context) (uint64, error)

// BarrierUtil make a barrier, next allocate call will generate id greater than barrier.
BarrierUntil(ctx context.Context, barrier uint64) error

// Sync expire the local allocator messages,
// syncs the local allocator and remote allocator.
Sync()
Expand All @@ -65,37 +70,92 @@ type Allocator interface {
}

type allocatorImpl struct {
mu sync.Mutex
cond *syncutil.ContextCond
remoteAllocator remoteBatchAllocator
lastSyncTime time.Time
lastAllocated uint64
localAllocator *localAllocator
}

// AllocateOne allocates a timestamp.
func (ta *allocatorImpl) Allocate(ctx context.Context) (uint64, error) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()

return ta.allocateOne(ctx)
}

func (ta *allocatorImpl) BarrierUntil(ctx context.Context, barrier uint64) error {
err := ta.barrierFastPath(ctx, barrier)
if err == nil {
return nil
}
if !errors.Is(err, errFastPathFailed) {
return err
}

// Fall back to the slow path to avoid block other id allocation opeartions.
ta.cond.L.Lock()
for ta.lastAllocated < barrier {
if err := ta.cond.Wait(ctx); err != nil {
return err
}
}
ta.cond.L.Unlock()
return nil
}

func (ta *allocatorImpl) barrierFastPath(ctx context.Context, barrier uint64) error {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()

for i := 0; i < 2; i++ {
id, err := ta.allocateOne(ctx)
if err != nil {
return err
}

// check if the allocated id is greater than barrier.
if id >= barrier {
return nil
}
if i == 0 {
// force to syncup the local allocator and remote allocator at first time.
// It's the fast path if the barrier is allocated from same remote allocator.
ta.localAllocator.exhausted()
}
}
return errFastPathFailed
}

func (ta *allocatorImpl) allocateOne(ctx context.Context) (uint64, error) {
// allocate one from local allocator first.
if id, err := ta.localAllocator.allocateOne(); err == nil {
ta.lastAllocated = id
ta.cond.UnsafeBroadcast()
return id, nil
}
// allocate from remote.
return ta.allocateRemote(ctx)
id, err := ta.allocateRemote(ctx)
if err != nil {
return 0, err
}
ta.lastAllocated = id
ta.cond.UnsafeBroadcast()
return id, nil
}

// Sync expire the local allocator messages,
// syncs the local allocator and remote allocator.
func (ta *allocatorImpl) Sync() {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()

ta.localAllocator.exhausted()
}

func (ta *allocatorImpl) SyncIfExpired(expire time.Duration) {
ta.mu.Lock()
defer ta.mu.Unlock()
ta.cond.L.Lock()
defer ta.cond.L.Unlock()

if time.Since(ta.lastSyncTime) > expire {
ta.localAllocator.exhausted()
Expand Down
49 changes: 49 additions & 0 deletions internal/streamingnode/server/resource/idalloc/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,52 @@ func TestTimestampAllocator(t *testing.T) {
_, err := allocator.Allocate(context.Background())
assert.Error(t, err)
}

func TestIDAllocator(t *testing.T) {
paramtable.Init()
paramtable.SetNodeID(1)

client := NewMockRootCoordClient(t)
f := syncutil.NewFuture[types.RootCoordClient]()
f.Set(client)

allocator := NewIDAllocator(f)

// Make local dirty
allocator.Allocate(context.Background())
// Test barrier fast path.
resp, err := client.AllocID(context.Background(), &rootcoordpb.AllocIDRequest{
Count: 100,
})
assert.NoError(t, err)
err = allocator.BarrierUntil(context.Background(), uint64(resp.ID))
assert.NoError(t, err)
newBarrierTimeTick, err := allocator.Allocate(context.Background())
assert.NoError(t, err)
assert.Greater(t, newBarrierTimeTick, uint64(resp.ID))

// Test slow path.
ch := make(chan struct{})
go func() {
barrier := newBarrierTimeTick + 1*batchAllocateSize
err := allocator.BarrierUntil(context.Background(), barrier)
assert.NoError(t, err)
newBarrierTimeTick, err := allocator.Allocate(context.Background())
assert.NoError(t, err)
assert.Greater(t, newBarrierTimeTick, barrier)
close(ch)
}()
select {
case <-ch:
assert.Fail(t, "should not finish")
case <-time.After(time.Millisecond * 20):
}
allocator.Sync()
_, err = allocator.Allocate(context.Background())
assert.NoError(t, err)
<-ch

allocator.SyncIfExpired(time.Millisecond * 50)
time.Sleep(time.Millisecond * 10)
allocator.SyncIfExpired(time.Millisecond * 10)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var errExhausted = errors.New("exhausted")
var (
errExhausted = errors.New("exhausted")
errFastPathFailed = errors.New("fast path failed")
)

// newLocalAllocator creates a new local allocator.
func newLocalAllocator() *localAllocator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/metricsutil"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/syncutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// AckManager manages the timestampAck.
type AckManager struct {
cond *syncutil.ContextCond
mu sync.Mutex
lastAllocatedTimeTick uint64 // The last allocated time tick, the latest timestamp allocated by the allocator.
lastConfirmedTimeTick uint64 // The last confirmed time tick, the message which time tick less than lastConfirmedTimeTick has been committed into wal.
notAckHeap typeutil.Heap[*Acker] // A minimum heap of timestampAck to search minimum allocated but not ack timestamp in list.
Expand All @@ -34,7 +33,7 @@ func NewAckManager(
metrics *metricsutil.TimeTickMetrics,
) *AckManager {
return &AckManager{
cond: syncutil.NewContextCond(&sync.Mutex{}),
mu: sync.Mutex{},
lastAllocatedTimeTick: 0,
notAckHeap: typeutil.NewHeap[*Acker](&ackersOrderByTimestamp{}),
ackHeap: typeutil.NewHeap[*Acker](&ackersOrderByEndTimestamp{}),
Expand All @@ -46,23 +45,18 @@ func NewAckManager(

// AllocateWithBarrier allocates a timestamp with a barrier.
func (ta *AckManager) AllocateWithBarrier(ctx context.Context, barrierTimeTick uint64) (*Acker, error) {
// wait until the lastConfirmedTimeTick is greater than barrierTimeTick.
ta.cond.L.Lock()
for ta.lastConfirmedTimeTick <= barrierTimeTick {
if err := ta.cond.Wait(ctx); err != nil {
return nil, err
}
// Just make a barrier to the underlying allocator.
if err := resource.Resource().TSOAllocator().BarrierUntil(ctx, barrierTimeTick); err != nil {
return nil, err
}
ta.cond.L.Unlock()

return ta.Allocate(ctx)
}

// Allocate allocates a timestamp.
// Concurrent safe to call with Sync and Allocate.
func (ta *AckManager) Allocate(ctx context.Context) (*Acker, error) {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()

// allocate one from underlying allocator first.
ts, err := resource.Resource().TSOAllocator().Allocate(ctx)
Expand Down Expand Up @@ -97,8 +91,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail,
}
tsWithAck.Ack(OptSync())

ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()

details := ta.acknowledgedDetails
ta.acknowledgedDetails = make(sortedDetails, 0, 5)
Expand All @@ -107,8 +101,8 @@ func (ta *AckManager) SyncAndGetAcknowledged(ctx context.Context) ([]*AckDetail,

// ack marks the timestamp as acknowledged.
func (ta *AckManager) ack(acker *Acker) {
ta.cond.L.Lock()
defer ta.cond.L.Unlock()
ta.mu.Lock()
defer ta.mu.Unlock()

acker.acknowledged = true
acker.detail.EndTimestamp = ta.lastAllocatedTimeTick
Expand All @@ -129,9 +123,6 @@ func (ta *AckManager) popUntilLastAllAcknowledged() {
return
}

// broadcast to notify the last confirmed timetick updated.
ta.cond.UnsafeBroadcast()

// update last confirmed time tick.
ta.lastConfirmedTimeTick = acknowledgedDetails[len(acknowledgedDetails)-1].BeginTimestamp
ta.metrics.UpdateLastConfirmedTimeTick(ta.lastConfirmedTimeTick)
Expand Down

0 comments on commit 4ba0ed3

Please sign in to comment.