Skip to content

Commit

Permalink
kafka replay speed: upstream partition_offset_reader.go (#9448)
Browse files Browse the repository at this point in the history
This is the first of series of PRs to upstream the code for improving Kafka replay speed in the ingester.

In this PR I'm upstreaming a tiny change related to partitionOffsetReader. We need caching in the reader so that we can check the start offset of the partition. We don't need that to be very exact because we use it to find out if we're trying to consume from before the start.

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Sep 27, 2024
1 parent 14be405 commit a3f4b16
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 24 deletions.
22 changes: 20 additions & 2 deletions pkg/storage/ingest/partition_offset_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
)

var (
Expand All @@ -32,16 +33,21 @@ type genericOffsetReader[O any] struct {
// request that will be issued (not the current in-flight one, if any).
nextResultPromiseMx sync.RWMutex
nextResultPromise *resultPromise[O]

// lastResultPromise is the last returned offset.
lastResultPromise *atomic.Pointer[resultPromise[O]]
}

func newGenericOffsetReader[O any](fetchLastProducedOffset func(context.Context) (O, error), pollInterval time.Duration, logger log.Logger) *genericOffsetReader[O] {
p := &genericOffsetReader[O]{
logger: logger,
fetchLastProducedOffset: fetchLastProducedOffset,
nextResultPromise: newResultPromise[O](),
lastResultPromise: atomic.NewPointer(newResultPromise[O]()),
}

p.Service = services.NewTimerService(pollInterval, nil, p.onPollInterval, p.stopping)
// Run the poll interval once at startup so we can cache the offset.
p.Service = services.NewTimerService(pollInterval, p.onPollInterval, p.onPollInterval, p.stopping)

return p
}
Expand Down Expand Up @@ -86,6 +92,7 @@ func (r *genericOffsetReader[O]) getAndNotifyLastProducedOffset(ctx context.Cont

// Notify whoever was waiting for it.
promise.notify(offset, err)
r.lastResultPromise.Store(promise)
}

// WaitNextFetchLastProducedOffset returns the result of the *next* "last produced offset" request
Expand All @@ -102,6 +109,12 @@ func (r *genericOffsetReader[O]) WaitNextFetchLastProducedOffset(ctx context.Con
return promise.wait(ctx)
}

// CachedOffset returns the last result of fetching the offset. This is likely outdated, but it's useful to get a directionally correct value quickly.
func (r *genericOffsetReader[O]) CachedOffset() (O, error) {
c := r.lastResultPromise.Load()
return c.resultValue, c.resultErr
}

// partitionOffsetReader is responsible to read the offsets of a single partition.
type partitionOffsetReader struct {
*genericOffsetReader[int64]
Expand All @@ -112,8 +125,13 @@ type partitionOffsetReader struct {
}

func newPartitionOffsetReader(client *kgo.Client, topic string, partitionID int32, pollInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *partitionOffsetReader {
offsetClient := newPartitionOffsetClient(client, topic, reg, logger)
return newPartitionOffsetReaderWithOffsetClient(offsetClient, partitionID, pollInterval, logger)
}

func newPartitionOffsetReaderWithOffsetClient(offsetClient *partitionOffsetClient, partitionID int32, pollInterval time.Duration, logger log.Logger) *partitionOffsetReader {
r := &partitionOffsetReader{
client: newPartitionOffsetClient(client, topic, reg, logger),
client: offsetClient,
partitionID: partitionID,
logger: logger, // Do not wrap with partition ID because it's already done by the caller.
}
Expand Down
169 changes: 147 additions & 22 deletions pkg/storage/ingest/partition_offset_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package ingest

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -81,15 +82,19 @@ func TestPartitionOffsetReader_WaitNextFetchLastProducedOffset(t *testing.T) {
client = createTestKafkaClient(t, kafkaCfg)
reader = newPartitionOffsetReader(client, topicName, partitionID, pollInterval, nil, logger)

lastOffset = atomic.NewInt64(1)
firstRequestReceived = make(chan struct{})
lastOffset = atomic.NewInt64(1)
firstRequestReceived = make(chan struct{})
secondRequestReceived = make(chan struct{})
)

cluster.ControlKey(int16(kmsg.ListOffsets), func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()

if lastOffset.Load() == 1 {
switch lastOffset.Load() {
case 1:
close(firstRequestReceived)
case 2:
close(secondRequestReceived)
}

// Mock the response so that we can increase the offset each time.
Expand All @@ -109,20 +114,22 @@ func TestPartitionOffsetReader_WaitNextFetchLastProducedOffset(t *testing.T) {

wg := sync.WaitGroup{}

// The 1st WaitNextFetchLastProducedOffset() is called before the service starts, so it's expected
// to wait the result of the 1st request.
runAsync(&wg, func() {
// The 1st WaitNextFetchLastProducedOffset() is called before the service starts.
// The service fetches the offset once at startup, so it's expected that the first wait
// to wait the result of the 2nd request.
// If we don't do synchronisation, then it's also possible that we fit in the first request, but we synchronise to avoid flaky tests
runAsyncAfter(&wg, firstRequestReceived, func() {
actual, err := reader.WaitNextFetchLastProducedOffset(ctx)
require.NoError(t, err)
assert.Equal(t, int64(1), actual)
assert.Equal(t, int64(2), actual)
})

// The 2nd WaitNextFetchLastProducedOffset() is called while the 1st request is running, so it's expected
// to wait the result of the 2nd request.
runAsyncAfter(&wg, firstRequestReceived, func() {
// The 2nd WaitNextFetchLastProducedOffset() is called while the 1st is running, so it's expected
// to wait the result of the 3rd request.
runAsyncAfter(&wg, secondRequestReceived, func() {
actual, err := reader.WaitNextFetchLastProducedOffset(ctx)
require.NoError(t, err)
assert.Equal(t, int64(2), actual)
assert.Equal(t, int64(3), actual)
})

// Now we can start the service.
Expand Down Expand Up @@ -215,15 +222,19 @@ func TestTopicOffsetsReader_WaitNextFetchLastProducedOffset(t *testing.T) {
client = createTestKafkaClient(t, kafkaCfg)
reader = NewTopicOffsetsReader(client, topicName, allPartitionIDs, pollInterval, nil, logger)

lastOffset = atomic.NewInt64(1)
firstRequestReceived = make(chan struct{})
lastOffset = atomic.NewInt64(1)
firstRequestReceived = make(chan struct{})
secondRequestReceived = make(chan struct{})
)

cluster.ControlKey(int16(kmsg.ListOffsets), func(kreq kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()

if lastOffset.Load() == 1 {
switch lastOffset.Load() {
case 1:
close(firstRequestReceived)
case 3:
close(secondRequestReceived)
}

// Mock the response so that we can increase the offset each time.
Expand All @@ -247,20 +258,22 @@ func TestTopicOffsetsReader_WaitNextFetchLastProducedOffset(t *testing.T) {

wg := sync.WaitGroup{}

// The 1st WaitNextFetchLastProducedOffset() is called before the service starts, so it's expected
// to wait the result of the 1st request.
runAsync(&wg, func() {
// The 1st WaitNextFetchLastProducedOffset() is called before the service starts.
// The service fetches the offset once at startup, so it's expected that the first wait
// to wait the result of the 2nd request.
// If we don't do synchronisation, then it's also possible that we fit in the first request, but we synchronise to avoid flaky tests
runAsyncAfter(&wg, firstRequestReceived, func() {
actual, err := reader.WaitNextFetchLastProducedOffset(ctx)
require.NoError(t, err)
assert.Equal(t, map[int32]int64{0: int64(1), 1: int64(2)}, actual)
assert.Equal(t, map[int32]int64{0: int64(3), 1: int64(4)}, actual)
})

// The 2nd WaitNextFetchLastProducedOffset() is called while the 1st request is running, so it's expected
// to wait the result of the 2nd request.
runAsyncAfter(&wg, firstRequestReceived, func() {
// The 2nd WaitNextFetchLastProducedOffset() is called while the 1st is running, so it's expected
// to wait the result of the 3rd request.
runAsyncAfter(&wg, secondRequestReceived, func() {
actual, err := reader.WaitNextFetchLastProducedOffset(ctx)
require.NoError(t, err)
assert.Equal(t, map[int32]int64{0: int64(3), 1: int64(4)}, actual)
assert.Equal(t, map[int32]int64{0: int64(5), 1: int64(6)}, actual)
})

// Now we can start the service.
Expand Down Expand Up @@ -289,3 +302,115 @@ func TestTopicOffsetsReader_WaitNextFetchLastProducedOffset(t *testing.T) {
assert.ErrorIs(t, err, context.Canceled)
})
}

func TestGenericPartitionReader_Caching(t *testing.T) {
logger := log.NewNopLogger()

t.Run("should initialize with fetched offset", func(t *testing.T) {
ctx := context.Background()
mockFetch := func(context.Context) (int64, error) {
return 42, nil
}

reader := newGenericOffsetReader[int64](mockFetch, time.Second, logger)
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

offset, err := reader.CachedOffset()
assert.NoError(t, err)
assert.Equal(t, int64(42), offset)
})

t.Run("should cache error from initial fetch", func(t *testing.T) {
ctx := context.Background()
expectedErr := fmt.Errorf("fetch error")
mockFetch := func(context.Context) (int64, error) {
return 0, expectedErr
}

reader := newGenericOffsetReader[int64](mockFetch, time.Second, logger)
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

offset, err := reader.CachedOffset()
assert.ErrorIs(t, err, expectedErr)
assert.Equal(t, int64(0), offset)
})

t.Run("should update cache on poll interval", func(t *testing.T) {
ctx := context.Background()
fetchCount := 0
fetchChan := make(chan struct{}, 3) // Buffer size of 3 to allow multiple fetches
mockFetch := func(ctx context.Context) (int64, error) {
fetchCount++
select {
case <-ctx.Done():
case fetchChan <- struct{}{}:
}
return int64(fetchCount), nil
}

reader := newGenericOffsetReader[int64](mockFetch, 10*time.Millisecond, logger)
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

// Wait for at least two fetches to complete and have their results cached.
<-fetchChan
<-fetchChan
<-fetchChan

offset, err := reader.CachedOffset()
assert.NoError(t, err)
assert.GreaterOrEqual(t, offset, int64(2), "Offset should have been updated at least once")
})

t.Run("should handle context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockFetch := func(context.Context) (int64, error) {
return 42, nil
}

reader := newGenericOffsetReader[int64](mockFetch, time.Second, logger)
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
t.Cleanup(func() {
cancel()
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), reader))
})

// The cached offset should be available
offset, err := reader.CachedOffset()
assert.NoError(t, err)
assert.Equal(t, int64(42), offset)
})

t.Run("should handle concurrent access", func(t *testing.T) {
ctx := context.Background()
mockFetch := func(context.Context) (int64, error) {
return 42, nil
}

reader := newGenericOffsetReader[int64](mockFetch, time.Second, logger)
require.NoError(t, services.StartAndAwaitRunning(ctx, reader))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, reader))
})

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
offset, err := reader.CachedOffset()
assert.NoError(t, err)
assert.Equal(t, int64(42), offset)
}()
}
wg.Wait()
})
}

0 comments on commit a3f4b16

Please sign in to comment.