From 861563d604d34f2860ba487fe34f8cb6b121f3b4 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Thu, 26 Sep 2024 16:06:20 +0200 Subject: [PATCH] kafka replay speed: concurrency fetching improvements (#9389) * fetched records include timestamps Signed-off-by: Dimitar Dimitrov * try with defaultMinBytesWaitTime=3s Signed-off-by: Dimitar Dimitrov * add fetch_min_bytes_max_wait Signed-off-by: Dimitar Dimitrov * Don't block on sending to the channel Signed-off-by: Dimitar Dimitrov * Remove wait for when we're fetching from the end Signed-off-by: Dimitar Dimitrov * Fix bug with blocking on fetch Signed-off-by: Dimitar Dimitrov * Slightly easier to follow lifecycle of previousResult Signed-off-by: Dimitar Dimitrov * Correct merging of results Signed-off-by: Dimitar Dimitrov * Avoid double-logging events Signed-off-by: Dimitar Dimitrov * Revert "add fetch_min_bytes_max_wait" This reverts commit 6197d4b9c88af3900c05879825cb940916b215eb. * Increase MinBytesWaitTime to 5s Signed-off-by: Dimitar Dimitrov * Add comment about warpstream and MinBytes Signed-off-by: Dimitar Dimitrov * Address review comments Signed-off-by: gotjosh * Add tests for concurrentFetchers Signed-off-by: Dimitar Dimitrov * Fix bugs in tracking lastReturnedRecord Signed-off-by: Dimitar Dimitrov * Renamed method Signed-off-by: gotjosh * use the older context Signed-off-by: gotjosh * Name variable correct variable name Signed-off-by: Dimitar Dimitrov * Reduce MaxWaitTime in PartitionReader tests Signed-off-by: Dimitar Dimitrov * Change test createConcurrentFetchers signature Signed-off-by: Dimitar Dimitrov * Sort imports Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov Signed-off-by: gotjosh Co-authored-by: gotjosh --- pkg/storage/ingest/reader.go | 72 +++++-- pkg/storage/ingest/reader_client.go | 4 +- pkg/storage/ingest/reader_test.go | 301 ++++++++++++++++++++++++++-- 3 files changed, 334 insertions(+), 43 deletions(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 92eb10c0eb..7f0aa633b0 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -38,9 +38,10 @@ const ( // kafkaOffsetEnd is a special offset value that means the end of the partition. kafkaOffsetEnd = int64(-1) - // defaultMinBytesWaitTime is the time the Kafka broker can wait for MinBytes to be filled. + // defaultMinBytesMaxWaitTime is the time the Kafka broker can wait for MinBytes to be filled. // This is usually used when there aren't enough records available to fulfil MinBytes, so the broker waits for more records to be produced. - defaultMinBytesWaitTime = 10 * time.Second + // Warpstream clamps this between 5s and 30s. + defaultMinBytesMaxWaitTime = 5 * time.Second ) var ( @@ -119,7 +120,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), metrics: newReaderMetrics(partitionID, reg), consumedOffsetWatcher: newPartitionOffsetWatcher(), - concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesWaitTime, + concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime, logger: log.With(logger, "partition", partitionID), reg: reg, } @@ -736,7 +737,7 @@ func fetchWantFrom(offset int64, recordsPerFetch int) fetchWant { return fetchWant{ startOffset: offset, endOffset: offset + int64(recordsPerFetch), - result: make(chan fetchResult, 1), // buffer of 1 so we can do secondary attempt requests in the background + result: make(chan fetchResult), } } @@ -819,6 +820,11 @@ func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) default: logger = level.Error(logger) } + var firstTimestamp, lastTimestamp string + if gotRecords > 0 { + firstTimestamp = fr.Records[0].Timestamp.String() + lastTimestamp = fr.Records[gotRecords-1].Timestamp.String() + } logger.Log( "msg", msg, "duration", time.Since(fetchStartTime), @@ -830,6 +836,8 @@ func (fr *fetchResult) logCompletedFetch(fetchStartTime time.Time, w fetchWant) "asked_bytes", w.MaxBytes(), "got_bytes", fr.fetchedBytes, "diff_bytes", int(w.MaxBytes())-fr.fetchedBytes, + "first_timestamp", firstTimestamp, + "last_timestamp", lastTimestamp, "hwm", fr.HighWatermark, "lso", fr.LogStartOffset, "err", fr.Err, @@ -841,9 +849,26 @@ func (fr *fetchResult) startWaitingForConsumption() { } func (fr *fetchResult) finishWaitingForConsumption() { + if fr.waitingToBePickedUpFromOrderedFetchesSpan == nil { + fr.waitingToBePickedUpFromOrderedFetchesSpan, fr.ctx = opentracing.StartSpanFromContext(fr.ctx, "fetchResult.noWaitingForConsumption") + } fr.waitingToBePickedUpFromOrderedFetchesSpan.Finish() } +// Merge merges other with an older fetchResult. mergedWith keeps most of the fields of fr and assumes they are more up to date then other's. +func (fr *fetchResult) Merge(older fetchResult) fetchResult { + if older.ctx != nil { + level.Debug(spanlogger.FromContext(older.ctx, log.NewNopLogger())).Log("msg", "merged fetch result with the next result") + } + + // older.Records are older than fr.Records, so we append them first. + fr.Records = append(older.Records, fr.Records...) + + // We ignore HighWatermark, LogStartOffset, LastStableOffset because this result should be more up to date. + fr.fetchedBytes += older.fetchedBytes + return *fr +} + func newEmptyFetchResult(ctx context.Context, err error) fetchResult { return fetchResult{ ctx: ctx, @@ -944,8 +969,10 @@ func (r *concurrentFetchers) pollFetches(ctx context.Context) (kgo.Fetches, cont firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord) r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime) - r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset f.Records = f.Records[firstUnreturnedRecordIdx:] + if len(f.Records) > 0 { + r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset + } return kgo.Fetches{{ Topics: []kgo.FetchTopic{ @@ -964,7 +991,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int { return i } } - return len(records) - 1 + return len(records) } func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) { @@ -1014,7 +1041,7 @@ func (r *concurrentFetchers) fetchSingle(ctx context.Context, fw fetchWant) (fr func (r *concurrentFetchers) buildFetchRequest(fw fetchWant, leaderEpoch int32) kmsg.FetchRequest { req := kmsg.NewFetchRequest() - req.MinBytes = 1 + req.MinBytes = 1 // Warpstream ignores this field. This means that the WaitTime below is always waited and MaxBytes play a bigger role in how fast Ws responds. req.Version = 13 req.MaxWaitMillis = int32(r.minBytesWaitTime / time.Millisecond) req.MaxBytes = fw.MaxBytes() @@ -1110,27 +1137,22 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa MaxRetries: 0, // retry forever }) - // more aggressive backoff when we're waiting for records to be produced. - // It's likely there's already some records produced by the time we get back the response and send another request. - newRecordsProducedBackoff := backoff.New(ctx, backoff.Config{ - MinBackoff: 10 * time.Millisecond, - MaxBackoff: time.Second, - MaxRetries: 0, // retry forever - }) - for w := range wants { // Start new span for each fetchWant. We want to record the lifecycle of a single record from being fetched to being ingested. wantSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch") wantSpan.SetTag("start_offset", w.startOffset) wantSpan.SetTag("end_offset", w.endOffset) + var previousResult fetchResult for attempt := 0; errBackoff.Ongoing() && w.endOffset > w.startOffset; attempt++ { - attemptSpan, ctx := spanlogger.NewWithLogger(ctx, wantSpan, "concurrentFetcher.fetch.attempt") + attemptSpan, ctx := spanlogger.NewWithLogger(ctx, logger, "concurrentFetcher.fetch.attempt") attemptSpan.SetTag("attempt", attempt) f := r.fetchSingle(ctx, w) + f = f.Merge(previousResult) + previousResult = f if f.Err != nil { - w = handleKafkaFetchErr(f.Err, w, errBackoff, newRecordsProducedBackoff, r.startOffsets, r.client, attemptSpan) + w = handleKafkaFetchErr(f.Err, w, errBackoff, r.startOffsets, r.client, attemptSpan) } if len(f.Records) == 0 { // Typically if we had an error, then there wouldn't be any records. @@ -1144,12 +1166,21 @@ func (r *concurrentFetchers) runFetcher(ctx context.Context, fetchersWg *sync.Wa // We reset the backoff if we received any records whatsoever. A received record means _some_ success. // We don't want to slow down until we hit a larger error. errBackoff.Reset() - newRecordsProducedBackoff.Reset() - f.startWaitingForConsumption() select { case w.result <- f: + previousResult = fetchResult{} case <-ctx.Done(): + default: + if w.startOffset >= w.endOffset { + // We've fetched all we were asked for the whole batch is ready, and we definitely have to wait to send on the channel now. + f.startWaitingForConsumption() + select { + case w.result <- f: + previousResult = fetchResult{} + case <-ctx.Done(): + } + } } attemptSpan.Finish() } @@ -1233,7 +1264,7 @@ type metadataRefresher interface { // handleKafkaFetchErr handles all the errors listed in the franz-go documentation as possible errors when fetching records. // For most of them we just apply a backoff. They are listed here so we can be explicit in what we're handling and how. // It may also return an adjusted fetchWant in case the error indicated, we were consuming not yet produced records or records already deleted due to retention. -func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff waiter, partitionStartOffset *genericOffsetReader[int64], refresher metadataRefresher, logger log.Logger) fetchWant { +func handleKafkaFetchErr(err error, fw fetchWant, longBackoff waiter, partitionStartOffset *genericOffsetReader[int64], refresher metadataRefresher, logger log.Logger) fetchWant { // Typically franz-go will update its own metadata when it detects a change in brokers. But it's hard to verify this. // So we force a metadata refresh here to be sure. // It's ok to call this from multiple fetchers concurrently. franz-go will only be sending one metadata request at a time (whether automatic, periodic, or forced). @@ -1269,7 +1300,6 @@ func handleKafkaFetchErr(err error, fw fetchWant, shortBackoff, longBackoff wait // We set a MaxWaitMillis on fetch requests, but even then there may be no records for some time. // Wait for a short time to allow the broker to catch up or for new records to be produced. level.Debug(logger).Log("msg", "offset out of range; waiting for new records to be produced") - shortBackoff.Wait() } case errors.Is(err, kerr.TopicAuthorizationFailed): longBackoff.Wait() diff --git a/pkg/storage/ingest/reader_client.go b/pkg/storage/ingest/reader_client.go index 24bb106119..a4d5f99151 100644 --- a/pkg/storage/ingest/reader_client.go +++ b/pkg/storage/ingest/reader_client.go @@ -3,8 +3,6 @@ package ingest import ( - "time" - "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -20,7 +18,7 @@ func NewKafkaReaderClient(cfg KafkaConfig, metrics *kprom.Metrics, logger log.Lo opts = append(opts, kgo.FetchMinBytes(1), kgo.FetchMaxBytes(fetchMaxBytes), // these are unused by concurrent fetchers - kgo.FetchMaxWait(5*time.Second), + kgo.FetchMaxWait(defaultMinBytesMaxWaitTime), kgo.FetchMaxPartitionBytes(50_000_000), // these are unused by concurrent fetchers // BrokerMaxReadBytes sets the maximum response size that can be read from diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index ebf1de7f0f..7a7725246c 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -219,7 +219,7 @@ func TestPartitionReader_WaitReadConsistencyUntilLastProducedOffset_And_WaitRead setup := func(t *testing.T, consumer recordConsumer, opts ...readerTestCfgOtp) (*PartitionReader, *kgo.Client, *prometheus.Registry) { reg := prometheus.NewPedanticRegistry() - _, clusterAddr := testkafka.CreateCluster(t, 1, topicName) + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) // Configure the reader to poll the "last produced offset" frequently. reader := createAndStartReader(ctx, t, clusterAddr, topicName, partitionID, consumer, @@ -1852,7 +1852,7 @@ func createReader(t *testing.T, addr string, topicName string, partitionID int32 require.NoError(t, err) // Reduce the time the fake kafka would wait for new records. Sometimes this blocks startup. - reader.concurrentFetchersMinBytesMaxWaitTime = time.Second + reader.concurrentFetchersMinBytesMaxWaitTime = 500 * time.Millisecond return reader } @@ -1980,8 +1980,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { fw fetchWant expectedFw fetchWant - expectedShortBackoff bool - expectedLongBackoff bool + expectedBackoff bool expectedMetadataRefresh bool }{ "no error": { @@ -2031,7 +2030,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, }, "NotLeaderForPartition": { err: kerr.NotLeaderForPartition, @@ -2044,7 +2043,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, "ReplicaNotAvailable": { @@ -2058,7 +2057,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, "UnknownLeaderEpoch": { @@ -2072,7 +2071,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, "FencedLeaderEpoch": { @@ -2086,7 +2085,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, "LeaderNotAvailable": { @@ -2100,7 +2099,7 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, "errUnknownPartitionLeader": { @@ -2114,18 +2113,15 @@ func TestHandleKafkaFetchErr(t *testing.T) { startOffset: 11, endOffset: 15, }, - expectedLongBackoff: true, + expectedBackoff: true, expectedMetadataRefresh: true, }, } for testName, testCase := range tests { t.Run(testName, func(t *testing.T) { - require.False(t, testCase.expectedShortBackoff && testCase.expectedLongBackoff, "set either long or short backoff") - waitedShort := false - shortBackOff := waiterFunc(func() { waitedShort = true }) - waitedLong := false - longBackOff := waiterFunc(func() { waitedLong = true }) + waitedBackoff := false + backoff := waiterFunc(func() { waitedBackoff = true }) refreshed := false refresher := refresherFunc(func() { refreshed = true }) @@ -2137,15 +2133,282 @@ func TestHandleKafkaFetchErr(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), offsetR)) }) - actualFw := handleKafkaFetchErr(testCase.err, testCase.fw, shortBackOff, longBackOff, offsetR, refresher, logger) + actualFw := handleKafkaFetchErr(testCase.err, testCase.fw, backoff, offsetR, refresher, logger) assert.Equal(t, testCase.expectedFw, actualFw) - assert.Equal(t, testCase.expectedShortBackoff, waitedShort) - assert.Equal(t, testCase.expectedLongBackoff, waitedLong) + assert.Equal(t, testCase.expectedBackoff, waitedBackoff) assert.Equal(t, testCase.expectedMetadataRefresh, refreshed) }) } } +func TestConcurrentFetchers(t *testing.T) { + const ( + topicName = "test-topic" + partitionID = 1 + recordsPerFetch = 3 + concurrency = 2 + ) + + t.Run("respect context cancellation", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // This should not block forever now + fetches, fetchCtx := fetchers.pollFetches(ctx) + + assert.Zero(t, fetches.NumRecords()) + assert.Error(t, fetchCtx.Err(), "Expected context to be cancelled") + }) + + t.Run("cold replay", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce some records before starting the fetchers + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + fetches, _ := fetchers.pollFetches(ctx) + assert.Equal(t, fetches.NumRecords(), 5) + }) + + t.Run("fetch records produced after startup", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records after starting the fetchers + for i := 0; i < 3; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + fetches, _ := fetchers.pollFetches(ctx) + assert.Equal(t, fetches.NumRecords(), 3) + }) + + t.Run("slow processing of fetches", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.pollFetches(ctx) + time.Sleep(1000 * time.Millisecond) // Simulate slow processing + consumedRecords += fetches.NumRecords() + } + assert.Equal(t, 10, consumedRecords) + }() + + // Produce more records while processing is slow + for i := 5; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + wg.Wait() + }) + + t.Run("fast processing of fetches", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + // Produce some records + for i := 0; i < 10; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + consumedRecords := 0 + for consumedRecords < 10 { + fetches, _ := fetchers.pollFetches(ctx) + consumedRecords += fetches.NumRecords() + // no processing delay + } + assert.Equal(t, 10, consumedRecords) + }() + + wg.Wait() + }) + + t.Run("fetch with different concurrency levels", func(t *testing.T) { + for _, concurrency := range []int{1, 2, 4} { + t.Run(fmt.Sprintf("concurrency-%d", concurrency), func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, 2) + + // Produce some records + for i := 0; i < 20; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + var totalRecords int + for totalRecords < 20 { + fetches, _ := fetchers.pollFetches(ctx) + totalRecords += fetches.NumRecords() + } + + assert.Equal(t, 20, totalRecords) + }) + } + }) + + t.Run("start from mid-stream offset", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + // Produce some initial records + for i := 0; i < 5; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i))) + } + + // Get the offset of the last produced record + lastOffset := produceRecord(ctx, t, client, topicName, partitionID, []byte("last-initial-record")) + + // Start fetchers from the offset after the initial records + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, lastOffset-1, concurrency, recordsPerFetch) + + // Produce some more records + for i := 0; i < 3; i++ { + produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("new-record-%d", i))) + } + + const expectedRecords = 5 + fetchedRecordsContents := make([]string, 0, expectedRecords) + for len(fetchedRecordsContents) < expectedRecords { + fetches, _ := fetchers.pollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value)) + }) + } + + assert.Equal(t, []string{ + "record-4", + "last-initial-record", + "new-record-0", + "new-record-1", + "new-record-2", + }, fetchedRecordsContents) + }) + + t.Run("synchronous produce and fetch", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName) + client := newKafkaProduceClient(t, clusterAddr) + + fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, recordsPerFetch) + + for round := 0; round < 3; round++ { + t.Log("starting round", round) + const recordsPerRound = 4 + // Produce a few records + expectedRecords := make([]string, 0, recordsPerRound) + for i := 0; i < recordsPerRound; i++ { + rec := []byte(fmt.Sprintf("round-%d-record-%d", round, i)) + expectedRecords = append(expectedRecords, string(rec)) + producedOffset := produceRecord(ctx, t, client, topicName, partitionID, rec) + t.Log("produced", producedOffset, string(rec)) + } + + // Poll for fetches and verify + fetchedRecords := make([]string, 0, recordsPerRound) + for len(fetchedRecords) < recordsPerRound { + fetches, _ := fetchers.pollFetches(ctx) + fetches.EachRecord(func(r *kgo.Record) { + fetchedRecords = append(fetchedRecords, string(r.Value)) + t.Log("fetched", r.Offset, string(r.Value)) + }) + } + + // Verify fetched records + assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round) + } + }) + +} + +func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers { + logger := log.NewNopLogger() + metrics := newReaderMetrics(partition, prometheus.NewRegistry()) + + // This instantiates the fields of kprom. + // This is usually done by franz-go, but since now we use the metrics ourselves, we need to instantiate the metrics ourselves. + metrics.kprom.OnNewClient(client) + + offsetReader := &partitionOffsetClient{ + client: client, + topic: topic, + } + + startOffsetsReader := newGenericOffsetReader(func(ctx context.Context) (int64, error) { + return offsetReader.FetchPartitionStartOffset(ctx, partition) + }, time.Second, logger) + + f, err := newConcurrentFetchers( + ctx, + client, + logger, + topic, + partition, + startOffset, + concurrency, + recordsPerFetch, + false, + time.Second, // same order of magnitude as the real one (defaultMinBytesMaxWaitTime), but faster for tests + offsetReader, + startOffsetsReader, + &metrics, + ) + require.NoError(t, err) + + return f +} + type waiterFunc func() func (w waiterFunc) Wait() { w() }