Skip to content

Commit

Permalink
ingest storage: proper shutdown of partitionCommitter (#9436)
Browse files Browse the repository at this point in the history
This `partitionCommitter` would be shut down via the services manager as soon as the service context is cancelled. This means that they shut down in parallel with the `PartitionReader`. The race comes when the `partitionCommitter` has already shut down while the `PartitionReader` is still processing some records. Then when the `PartitionReader` tries to `enqueueCommit`, that sets the atomic, but does not send this to Kafka.

As a result we may not always persist the latest commit to Kafka.

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Oct 1, 2024
1 parent 020fdf7 commit ab91e9d
Showing 1 changed file with 3 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
if err != nil {
return errors.Wrap(err, "creating service manager")
}
err = services.StartManagerAndAwaitHealthy(ctx, r.dependencies)
// Use context.Background() because we want to stop all dependencies when the PartitionReader stops
// instead of stopping them when ctx is cancelled and while the PartitionReader is still running.
err = services.StartManagerAndAwaitHealthy(context.Background(), r.dependencies)
if err != nil {
return errors.Wrap(err, "starting service manager")
}
Expand Down

0 comments on commit ab91e9d

Please sign in to comment.