Skip to content

Commit

Permalink
kafka replay speed: dont send requests for higher than the high water…
Browse files Browse the repository at this point in the history
…mark (#9661)

* kafka replay speed: dont send requests for higher than the high watermark

This is related to a problem we have: high e2e latency when fetching at the head of the partition. The current theory is that WS introduces extra delay when requesting an offset which doesn't exist.

Currently, we have two sets of flags: one for startup and one for ongoing fetching. This PR enables us to have just a single configuration which determines the max concurrency. We will limit concurrency by never requesting an offset which doesn't exist.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Update pkg/storage/ingest/fetcher.go

Co-authored-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
dimitarvdimitrov and pracucci authored Oct 18, 2024
1 parent 4e66244 commit 2af261a
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 4 deletions.
33 changes: 29 additions & 4 deletions pkg/storage/ingest/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/plugin/kotel"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -474,7 +475,7 @@ func sumRecordLengths(records []*kgo.Record) (sum int) {
return sum
}

func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logger log.Logger) {
func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logger log.Logger, highWatermark *atomic.Int64) {
defer r.wg.Done()

errBackoff := backoff.New(ctx, backoff.Config{
Expand All @@ -500,7 +501,9 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
if f.Err != nil {
w = handleKafkaFetchErr(f.Err, w, errBackoff, r.startOffsets, r.client, attemptSpan)
}

if hwm := f.HighWatermark; hwm >= 0 {
casHWM(highWatermark, hwm)
}
if len(f.Records) == 0 {
// Typically if we had an error, then there wouldn't be any records.
// But it's hard to verify this for all errors from the Kafka API docs, so just to be sure, we process any records we might have received.
Expand Down Expand Up @@ -557,15 +560,27 @@ func (r *concurrentFetchers) run(ctx context.Context, wants chan fetchWant, logg
}
}

func casHWM(highWwatermark *atomic.Int64, newHWM int64) {
for hwm := highWwatermark.Load(); hwm < newHWM; hwm = highWwatermark.Load() {
if highWwatermark.CompareAndSwap(hwm, newHWM) {
break
}
}
}

func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concurrency, recordsPerFetch int) {
level.Info(r.logger).Log("msg", "starting concurrent fetchers", "start_offset", startOffset, "concurrency", concurrency, "recordsPerFetch", recordsPerFetch)

// HWM is updated by the fetchers. A value of 0 is the same as there not being any produced records.
// A value of 0 doesn't prevent progress because we ensure there is at least one dispatched fetchWant.
highWatermark := atomic.NewInt64(0)

wants := make(chan fetchWant)
defer close(wants)
r.wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
logger := log.With(r.logger, "fetcher", i)
go r.run(ctx, wants, logger)
go r.run(ctx, wants, logger, highWatermark)
}

var (
Expand All @@ -587,13 +602,23 @@ func (r *concurrentFetchers) start(ctx context.Context, startOffset int64, concu
// So we don't try to get new results from the fetchers.
refillBufferedResult = nil
}
dispatchNextWant := chan fetchWant(nil)
if nextResult == nil || nextFetch.startOffset <= highWatermark.Load() {
// In Warpstream fetching past the end induced more delays than MinBytesWaitTime.
// So we dispatch a fetch only if it's fetching an existing offset.
// This shouldn't noticeably affect performance with Apache Kafka, after all franz-go only has a concurrency of 1 per partition.
//
// At the same time we don't want to reach a deadlock where the HWM is not updated and there are no fetches in flight.
// When there isn't a fetch in flight the HWM will never be updated, we will dispatch the next fetchWant even if that means it's above the HWM.
dispatchNextWant = wants
}
select {
case <-r.done:
return
case <-ctx.Done():
return

case wants <- nextFetch:
case dispatchNextWant <- nextFetch:
pendingResults.PushBack(nextFetch.result)
if nextResult == nil {
// In case we previously exhausted pendingResults, we just created
Expand Down
200 changes: 200 additions & 0 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,206 @@ func TestConcurrentFetchers(t *testing.T) {
t.Logf("Records produced after start: %d", additionalRecords)
t.Logf("Records fetched: %d", len(fetchedRecords))
})

t.Run("staggered production with exact multiple of concurrency and records per fetch", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

const (
topicName = "test-topic"
partitionID = 1
concurrency = 2
recordsPerFetch = 3
)

_, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch)

// Produce exactly as many records as is the multiple of concurrency and records per fetch.
// This will give each fetcher exactly as many records as they're supposed to fetch.
const initiallyProducedRecords = concurrency * recordsPerFetch
var producedRecordsBytes [][]byte
for i := 0; i < initiallyProducedRecords; i++ {
record := []byte(fmt.Sprintf("record-%d", i+1))
produceRecord(ctx, t, client, topicName, partitionID, record)
producedRecordsBytes = append(producedRecordsBytes, record)
}

// Expect that we've received all records.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initiallyProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}

// Produce a few more records
const additionalRecords = 3
for i := 0; i < additionalRecords; i++ {
record := []byte(fmt.Sprintf("additional-record-%d", i+1))
produceRecord(ctx, t, client, topicName, partitionID, record)
producedRecordsBytes = append(producedRecordsBytes, record)
}

// Fetchers shouldn't be stalled and should continue fetching as the HWM moves forward.
for len(fetchedRecordsBytes) < initiallyProducedRecords+additionalRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}

assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes)
})

t.Run("staggered production with one less than multiple of concurrency and records per fetch", func(t *testing.T) {
// This test is the same as "staggered production with exact multiple of concurrency and records per fetch"
// but covers an off-by-one error.
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

const (
topicName = "test-topic"
partitionID = 1
concurrency = 2
recordsPerFetch = 3
)

cluster, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch)

// Produce exactly as many records as is the multiple of concurrency and records per fetch.
// This will give each fetcher exactly as many records as they're supposed to fetch.
const initiallyProducedRecords = concurrency*recordsPerFetch - 1
var producedRecordsBytes [][]byte
for i := 0; i < initiallyProducedRecords; i++ {
record := []byte(fmt.Sprintf("record-%d", i+1))
produceRecord(ctx, t, client, topicName, partitionID, record)
producedRecordsBytes = append(producedRecordsBytes, record)
}

// Expect that we've received all records.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initiallyProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}

// Produce a few more records
const additionalRecords = 3
for i := 0; i < additionalRecords; i++ {
record := []byte(fmt.Sprintf("additional-record-%d", i+1))
produceRecord(ctx, t, client, topicName, partitionID, record)
producedRecordsBytes = append(producedRecordsBytes, record)
}

// Fetchers shouldn't be stalled and should continue fetching as the HWM moves forward.
for len(fetchedRecordsBytes) < initiallyProducedRecords+additionalRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}

assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes)

// Mock Kafka to fail the Fetch request.
cluster.ControlKey(int16(kmsg.Fetch), func(kmsg.Request) (kmsg.Response, error, bool) {
cluster.KeepControl()

return nil, errors.New("mocked error"), true
})
})

t.Run("fetchers do not request offset beyond high watermark", func(t *testing.T) {
// In Warpstream fetching past the end induced more delays than MinBytesWaitTime.
// So we avoid dispatching a fetch for past the high watermark.
t.Parallel()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

const (
topicName = "test-topic"
partitionID = 1
concurrency = 2
recordsPerFetch = 3
initialRecords = 8
)

cluster, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

fetchRequestCount := atomic.NewInt64(0)
maxRequestedOffset := atomic.NewInt64(-1)

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch)

// Produce initial records
var producedRecordsBytes [][]byte
for i := 0; i < initialRecords; i++ {
record := []byte(fmt.Sprintf("record-%d", i+1))
producedRecordsBytes = append(producedRecordsBytes, record)
offset := produceRecord(ctx, t, client, topicName, partitionID, record)
t.Log("Produced record at offset", offset)
}

// Fetch and verify records; this should unblock the fetchers.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initialRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}

// Set up control function to monitor fetch requests
var checkRequestOffset func(req kmsg.Request) (kmsg.Response, error, bool)
checkRequestOffset = func(req kmsg.Request) (kmsg.Response, error, bool) {
fetchReq := req.(*kmsg.FetchRequest)
cluster.KeepControl()
fetchRequestCount.Inc()
assert.Len(t, fetchReq.Topics, 1)
assert.Len(t, fetchReq.Topics[0].Partitions, 1)
requestedOffset := fetchReq.Topics[0].Partitions[0].FetchOffset
maxRequestedOffset.Store(fetchReq.Topics[0].Partitions[0].FetchOffset)
t.Log("Received fetch request for offset", requestedOffset)

cluster.DropControl() // Let the cluster handle the request normally
cluster.ControlKey(kmsg.Fetch.Int16(), checkRequestOffset) // But register the function again so we can inspect the next request too.

return nil, nil, false
}
cluster.ControlKey(kmsg.Fetch.Int16(), checkRequestOffset)

// Wait for a few fetch requests
require.Eventually(t, func() bool {
return fetchRequestCount.Load() >= 10
}, 30*time.Second, 100*time.Millisecond, "Not enough fetch requests received")

// Verify that the max requested offset does not exceed the number of produced records
assert.LessOrEqualf(t, int(maxRequestedOffset.Load()), len(producedRecordsBytes),
"Requested offset (%d) should not exceed the number of produced records (%d)", maxRequestedOffset.Load(), len(producedRecordsBytes))

// Verify the number and content of fetched records
assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes, "Should fetch all produced records")
})
}

func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers {
Expand Down

0 comments on commit 2af261a

Please sign in to comment.