From b964ae482de062f8615ec028cebf530564b38004 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 2 Oct 2024 17:56:36 +0200 Subject: [PATCH] ingest storage: don't return an error when records are successfully ingested This fixes a flaky TestPartitionReader_Commit because we'd shut down the reader. The bug was introduced in XXX Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/reader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 6c14894407..8a5c36e355 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -503,7 +503,8 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche err := consumer.Consume(consumeCtx, records) if err == nil { level.Debug(logger).Log("msg", "closing consumer after successful consumption") - break + // The context might have been cancelled in the meantime, so we return here instead of breaking the loop and returning the context error + return nil } level.Error(logger).Log( "msg", "encountered error while ingesting data from Kafka; should retry",