Skip to content

Commit

Permalink
Add option to specify from which position to consume a partition at s…
Browse files Browse the repository at this point in the history
…tartup (#7685)

* Add option to specify from which position to consume a partition at startup

Signed-off-by: Marco Pracucci <[email protected]>

* Addressed review feedback

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Mar 22, 2024
1 parent f573a03 commit ea1893a
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 40 deletions.
25 changes: 22 additions & 3 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@ package ingest
import (
"errors"
"flag"
"fmt"
"slices"
"strings"
"time"
)

const (
consumeFromLastOffset = "last-offset"
consumeFromStart = "start"
consumeFromEnd = "end"
)

var (
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrMissingKafkaAddress = errors.New("the Kafka address has not been configured")
ErrMissingKafkaTopic = errors.New("the Kafka topic has not been configured")
ErrInvalidConsumePosition = errors.New("the configured consume position is invalid")

consumeFromPositionOptions = []string{consumeFromLastOffset, consumeFromStart, consumeFromEnd}
)

type Config struct {
Expand Down Expand Up @@ -48,7 +60,9 @@ type KafkaConfig struct {

LastProducedOffsetPollInterval time.Duration `yaml:"last_produced_offset_poll_interval"`
LastProducedOffsetRetryTimeout time.Duration `yaml:"last_produced_offset_retry_timeout"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`

ConsumeFromPositionAtStartup string `yaml:"consume_from_position_at_startup"`
MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -64,6 +78,8 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

f.DurationVar(&cfg.LastProducedOffsetPollInterval, prefix+".last-produced-offset-poll-interval", time.Second, "How frequently to poll the last produced offset, used to enforce strong read consistency.")
f.DurationVar(&cfg.LastProducedOffsetRetryTimeout, prefix+".last-produced-offset-retry-timeout", 10*time.Second, "How long to retry a failed request to get the last produced offset.")

f.StringVar(&cfg.ConsumeFromPositionAtStartup, prefix+".consume-from-position-at-startup", consumeFromLastOffset, fmt.Sprintf("From which position to start consuming the partition at startup. Supported options: %s.", strings.Join(consumeFromPositionOptions, ", ")))
f.DurationVar(&cfg.MaxConsumerLagAtStartup, prefix+".max-consumer-lag-at-startup", 15*time.Second, "The maximum tolerated lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set 0 to disable waiting for maximum consumer lag being honored at startup.")
}

Expand All @@ -74,6 +90,9 @@ func (cfg *KafkaConfig) Validate() error {
if cfg.Topic == "" {
return ErrMissingKafkaTopic
}
if !slices.Contains(consumeFromPositionOptions, cfg.ConsumeFromPositionAtStartup) {
return ErrInvalidConsumePosition
}

return nil
}
9 changes: 9 additions & 0 deletions pkg/storage/ingest/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func TestConfig_Validate(t *testing.T) {
cfg.KafkaConfig.Topic = "test"
},
},
"should fail if ingest storage is enabled and consume position is invalid": {
setup: func(cfg *Config) {
cfg.Enabled = true
cfg.KafkaConfig.Address = "localhost"
cfg.KafkaConfig.Topic = "test"
cfg.KafkaConfig.ConsumeFromPositionAtStartup = "middle"
},
expectedErr: ErrInvalidConsumePosition,
},
}

for testName, testData := range tests {
Expand Down
77 changes: 49 additions & 28 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
const (
// kafkaStartOffset is a special offset value that means the beginning of the partition.
kafkaStartOffset = int64(-2)

// kafkaEndOffset is a special offset value that means the end of the partition.
kafkaEndOffset = int64(-1)
)

type record struct {
Expand Down Expand Up @@ -94,20 +97,39 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {
}
}()

startFromOffset, err := r.fetchLastCommittedOffsetWithRetries(ctx)
if err != nil {
return err
var startOffset int64
var err error

// Find the offset from which we should start consuming.
switch r.kafkaCfg.ConsumeFromPositionAtStartup {
case consumeFromStart:
startOffset = kafkaStartOffset
level.Info(r.logger).Log("msg", "starting consumption from partition start", "consumer_group", r.consumerGroup)

case consumeFromEnd:
startOffset = kafkaEndOffset
level.Warn(r.logger).Log("msg", "starting consumption from partition end (may cause data loss)", "consumer_group", r.consumerGroup)

default:
if offset, exists, err := r.fetchLastCommittedOffsetWithRetries(ctx); err != nil {
return err
} else if exists {
level.Info(r.logger).Log("msg", "starting consumption from last committed offset", "offset", offset, "consumer_group", r.consumerGroup)
startOffset = offset
} else {
level.Info(r.logger).Log("msg", "starting consumption from start because no committed offset has been found", "consumer_group", r.consumerGroup)
startOffset = kafkaStartOffset
}
}
level.Info(r.logger).Log("msg", "resuming consumption from offset", "offset", startFromOffset, "consumer_group", r.consumerGroup)

// Initialise the last consumed offset only if we've got a real offset from the consumer group.
// If we got a special offset (e.g. kafkaStartOffset) we want to keep the last consumed offset uninitialized,
// and it will be updated as soon as we consume the first record.
if startFromOffset >= 0 {
r.consumedOffsetWatcher.Notify(startFromOffset - 1)
if startOffset >= 0 {
r.consumedOffsetWatcher.Notify(startOffset - 1)
}

r.client, err = r.newKafkaReader(kgo.NewOffset().At(startFromOffset))
r.client, err = r.newKafkaReader(kgo.NewOffset().At(startOffset))
if err != nil {
return errors.Wrap(err, "creating kafka reader client")
}
Expand All @@ -126,8 +148,12 @@ func (r *PartitionReader) start(ctx context.Context) (returnErr error) {

// Enforce the max consumer lag (if enabled).
if maxLag := r.kafkaCfg.MaxConsumerLagAtStartup; maxLag > 0 {
if err := r.processNextFetchesUntilMaxLagHonored(ctx, maxLag); err != nil {
return err
if startOffset != kafkaEndOffset {
if err := r.processNextFetchesUntilMaxLagHonored(ctx, maxLag); err != nil {
return err
}
} else {
level.Info(r.logger).Log("msg", "partition reader is skipping to consume partition until max consumer lag is honored because it's going to consume the partition from the end")
}
}

Expand Down Expand Up @@ -378,7 +404,7 @@ func (r *PartitionReader) newKafkaReader(at kgo.Offset) (*kgo.Client, error) {
return client, nil
}

func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Context) (offset int64, err error) {
func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Context) (offset int64, exists bool, err error) {
var (
retry = backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
Expand All @@ -388,9 +414,9 @@ func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Contex
)

for retry.Ongoing() {
offset, err = r.fetchLastCommittedOffset(ctx)
offset, exists, err = r.fetchLastCommittedOffset(ctx)
if err == nil {
return offset, nil
return offset, exists, nil
}

level.Warn(r.logger).Log("msg", "failed to fetch last committed offset", "err", err)
Expand All @@ -402,44 +428,39 @@ func (r *PartitionReader) fetchLastCommittedOffsetWithRetries(ctx context.Contex
err = retry.Err()
}

return offset, err
return 0, false, err
}

// fetchLastCommittedOffset returns the last consumed offset which has been committed by the PartitionReader
// to the consumer group. If there is no offset committed, this function returns kafkaStartOffset constant,
// which is a special value used to signal that partition should be consumed from the start.
func (r *PartitionReader) fetchLastCommittedOffset(ctx context.Context) (int64, error) {
// to the consumer group.
func (r *PartitionReader) fetchLastCommittedOffset(ctx context.Context) (offset int64, exists bool, _ error) {
// We use an ephemeral client to fetch the offset and then create a new client with this offset.
// The reason for this is that changing the offset of an existing client requires to have used this client for fetching at least once.
// We don't want to do noop fetches just to warm up the client, so we create a new client instead.
cl, err := kgo.NewClient(commonKafkaClientOptions(r.kafkaCfg, r.metrics.kprom, r.logger)...)
if err != nil {
return 0, errors.Wrap(err, "unable to create admin client")
return 0, false, errors.Wrap(err, "unable to create admin client")
}
adm := kadm.NewClient(cl)
defer adm.Close()

offsets, err := adm.FetchOffsets(ctx, r.consumerGroup)
if errors.Is(err, kerr.GroupIDNotFound) || errors.Is(err, kerr.UnknownTopicOrPartition) {
// Make sure we replay any data already written to the partition in case the consumer
// is booting up for the first time ever.
return kafkaStartOffset, nil
return 0, false, nil
}
if err != nil {
return 0, errors.Wrap(err, "unable to fetch group offsets")
return 0, false, errors.Wrap(err, "unable to fetch group offsets")
}

offset, exists := offsets.Lookup(r.kafkaCfg.Topic, r.partitionID)
offsetRes, exists := offsets.Lookup(r.kafkaCfg.Topic, r.partitionID)
if !exists {
// Make sure we replay any data already written to the partition in case the consumer
// is booting up for the first time ever.
return kafkaStartOffset, nil
return 0, false, nil
}
if offset.Err != nil {
return 0, offset.Err
if offsetRes.Err != nil {
return 0, false, offsetRes.Err
}

return offset.At, nil
return offsetRes.At, true, nil
}

// WaitReadConsistency waits until all data produced up until now has been consumed by the reader.
Expand Down
Loading

0 comments on commit ea1893a

Please sign in to comment.