Skip to content

Commit

Permalink
Fix bugs in tracking lastReturnedRecord
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 26, 2024
1 parent 488c773 commit e7186a0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 13 deletions.
6 changes: 4 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,8 +968,10 @@ func (r *concurrentFetchers) pollFetches(ctx context.Context) (kgo.Fetches, cont
firstUnreturnedRecordIdx := recordIndexAfterOffset(f.Records, r.lastReturnedRecord)
r.recordOrderedFetchTelemetry(f, firstUnreturnedRecordIdx, waitStartTime)

r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
f.Records = f.Records[firstUnreturnedRecordIdx:]
if len(f.Records) > 0 {
r.lastReturnedRecord = f.Records[len(f.Records)-1].Offset
}

return kgo.Fetches{{
Topics: []kgo.FetchTopic{
Expand All @@ -988,7 +990,7 @@ func recordIndexAfterOffset(records []*kgo.Record, offset int64) int {
return i
}
}
return len(records) - 1
return len(records)
}

func (r *concurrentFetchers) recordOrderedFetchTelemetry(f fetchResult, firstReturnedRecordIndex int, waitStartTime time.Time) {
Expand Down
53 changes: 42 additions & 11 deletions pkg/storage/ingest/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2217,14 +2217,15 @@ func TestConcurrentFetchers(t *testing.T) {

var wg sync.WaitGroup
wg.Add(1)
consumedRecords := 0
go func() {
defer wg.Done()
for consumedRecords < 5 {
consumedRecords := 0
for consumedRecords < 10 {
fetches, _ := fetchers.pollFetches(ctx)
time.Sleep(100 * time.Millisecond) // Simulate slow processing
time.Sleep(1000 * time.Millisecond) // Simulate slow processing
consumedRecords += fetches.NumRecords()
}
assert.Equal(t, 10, consumedRecords)
}()

// Produce more records while processing is slow
Expand All @@ -2233,13 +2234,6 @@ func TestConcurrentFetchers(t *testing.T) {
}

wg.Wait()

// Collect back the rest of the records; maybe there are none.
for consumedRecords < 10 {
fetches, _ := fetchers.pollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
assert.Equal(t, consumedRecords, 10)
})

t.Run("fast processing of fetches", func(t *testing.T) {
Expand Down Expand Up @@ -2339,6 +2333,43 @@ func TestConcurrentFetchers(t *testing.T) {
"new-record-2",
}, fetchedRecordsContents)
})

t.Run("synchronous produce and fetch", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, clusterAddr := testkafka.CreateCluster(t, partitionID+1, topicName)
client := newKafkaProduceClient(t, clusterAddr)

fetchers := createConcurrentFetchers(t, ctx, client, topicName, partitionID, 0, concurrency, recordsPerFetch)

for round := 0; round < 3; round++ {
t.Log("starting round", round)
const recordsPerRound = 4
// Produce a few records
expectedRecords := make([]string, 0, recordsPerRound)
for i := 0; i < recordsPerRound; i++ {
rec := []byte(fmt.Sprintf("round-%d-record-%d", round, i))
expectedRecords = append(expectedRecords, string(rec))
producedOffset := produceRecord(ctx, t, client, topicName, partitionID, rec)
t.Log("produced", producedOffset, string(rec))
}

// Poll for fetches and verify
fetchedRecords := make([]string, 0, recordsPerRound)
for len(fetchedRecords) < recordsPerRound {
fetches, _ := fetchers.pollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, string(r.Value))
t.Log("fetched", r.Offset, string(r.Value))
})
}

// Verify fetched records
assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round)
}
})

}

func createConcurrentFetchers(t *testing.T, ctx context.Context, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency, recordsPerFetch int) *concurrentFetchers {
Expand Down Expand Up @@ -2368,7 +2399,7 @@ func createConcurrentFetchers(t *testing.T, ctx context.Context, client *kgo.Cli
concurrency,
recordsPerFetch,
false,
defaultMinBytesWaitTime,
time.Second, // same order of magnitude as the real one (defaultMinBytesWaitTime), but faster for tests
offsetReader,
startOffsetsReader,
&metrics,
Expand Down

0 comments on commit e7186a0

Please sign in to comment.