Skip to content

Commit

Permalink
Register block-builder target in mimir (#9315)
Browse files Browse the repository at this point in the history
* blockbuilder: get rid of lag from section consumer loop

Signed-off-by: Vladimir Varankin <[email protected]>

* mimir: register block-builder target

Signed-off-by: Vladimir Varankin <[email protected]>

* rename internal variables

Signed-off-by: Vladimir Varankin <[email protected]>

---------

Signed-off-by: Vladimir Varankin <[email protected]>
  • Loading branch information
narqo authored Sep 26, 2024
1 parent be0fe27 commit aa7f5b0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 71 deletions.
97 changes: 48 additions & 49 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,30 @@ func (b *BlockBuilder) stopping(_ error) error {
func (b *BlockBuilder) running(ctx context.Context) error {
// Do initial consumption on start using current time as the point up to which we are consuming.
// To avoid small blocks at startup, we consume until the <consume interval> boundary + buffer.
cycleEnd := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
err := b.nextConsumeCycle(ctx, cycleEnd)
cycleEndTime := cycleEndAtStartup(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
err := b.nextConsumeCycle(ctx, cycleEndTime)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return err
}

cycleEnd, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
cycleEndTime, waitDur := nextCycleEnd(time.Now(), b.cfg.ConsumeInterval, b.cfg.ConsumeIntervalBuffer)
for {
select {
case <-time.After(waitDur):
level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEnd)
err := b.nextConsumeCycle(ctx, cycleEnd)
level.Info(b.logger).Log("msg", "triggering next consume cycle", "cycle_end", cycleEndTime)
err := b.nextConsumeCycle(ctx, cycleEndTime)
if err != nil && !errors.Is(err, context.Canceled) {
// Fail the whole service in case of a non-recoverable error.
return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEnd, err)
return fmt.Errorf("consume next cycle until cycle_end %s: %w", cycleEndTime, err)
}

// If we took more than ConsumeInterval to consume the records, this will immediately start the next consumption.
// TODO(codesome): track waitDur < 0, which is the time we ran over. Should have an alert on this.
cycleEnd = cycleEnd.Add(b.cfg.ConsumeInterval)
waitDur = time.Until(cycleEnd)
cycleEndTime = cycleEndTime.Add(b.cfg.ConsumeInterval)
waitDur = time.Until(cycleEndTime)
case <-ctx.Done():
level.Info(b.logger).Log("msg", "context cancelled, stopping")
return nil
Expand Down Expand Up @@ -169,7 +169,7 @@ func nextCycleEnd(t time.Time, interval, buffer time.Duration) (time.Time, time.
// nextConsumeCycle manages consumption of currently assigned partitions.
// The cycleEnd argument indicates the timestamp (relative to Kafka records) up until which to consume from partitions
// in this cycle. That is, Kafka records produced after the cycleEnd mark will be consumed in the next cycle.
func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) error {
func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEndTime time.Time) error {
defer func(t time.Time) {
b.blockBuilderMetrics.consumeCycleDuration.Observe(time.Since(t).Seconds())
}(time.Now())
Expand All @@ -184,26 +184,26 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
// in the beginning of the cycle.
// Lag is the upperbound number of records we'll have to consume from Kafka to build the blocks.
// It's the "upperbound" because the consumption may be stopped earlier if we get records containing
// samples with timestamp greater than the cycleEnd timestamp.
// samples with timestamp greater than the cycleEndTime timestamp.
lag, err := b.getLagForPartition(ctx, partition)
if err != nil {
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd)
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime)
continue
}
if err := lag.Err; err != nil {
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEnd)
level.Error(b.logger).Log("msg", "failed to get partition lag", "err", err, "partition", partition, "cycle_end", cycleEndTime)
continue
}

b.blockBuilderMetrics.consumerLagRecords.WithLabelValues(fmt.Sprintf("%d", lag.Partition)).Set(float64(lag.Lag))

if lag.Lag <= 0 {
level.Info(b.logger).Log("msg", "nothing to consume in partition", "partition", partition, "offset", lag.Commit.At, "end_offset", lag.End.Offset, "lag", lag.Lag)
level.Info(b.logger).Log("msg", "nothing to consume in partition", "partition", partition, "commit_offset", lag.Commit.At, "start_offset", lag.Start.Offset, "end_offset", lag.End.Offset, "lag", lag.Lag)
continue
}

state := partitionStateFromLag(b.logger, lag, b.fallbackOffsetMillis)
if err := b.consumePartition(ctx, partition, state, cycleEnd); err != nil {
if err := b.consumePartition(ctx, partition, state, cycleEndTime, lag.End.Offset); err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "partition", partition)
}
}
Expand Down Expand Up @@ -239,8 +239,6 @@ func (b *BlockBuilder) getLagForPartition(ctx context.Context, partition int32)
}

type partitionState struct {
// Lag is the upperbound number of records the cycle will need to consume.
Lag int64
// Commit is the offset of the next record we'll start consuming.
Commit kadm.Offset
// CommitRecordTimestamp is the timestamp of the record whose offset was committed (and not the time of commit).
Expand All @@ -256,30 +254,31 @@ func partitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM
if err != nil {
// If there is an error in unmarshalling the metadata, treat it as if
// we have no commit. There is no reason to stop the cycle for this.
level.Error(logger).Log("msg", "error unmarshalling commit metadata", "err", err, "partition", lag.Partition, "offset", lag.Commit.At, "metadata", lag.Commit.Metadata)
level.Error(logger).Log("msg", "error unmarshalling commit metadata", "err", err, "partition", lag.Partition, "commit_offset", lag.Commit.At, "metadata", lag.Commit.Metadata)
}

if commitRecTs == 0 {
// If there was no commit metadata, we use the fallback to replay a set amount of
// records because it is non-trivial to peek at the first record in a partition to determine
// the range of replay required. Without knowing the range, we might end up trying to consume
// a lot of records in a single partition consumption call and end up in an OOM loop.
level.Info(logger).Log("msg", "no commit record timestamp in commit metadata; needs to fall back", "partition", lag.Partition, "offset", lag.Commit.At, "metadata", lag.Commit.Metadata, "fallback_millis", fallbackMillis)
level.Info(logger).Log("msg", "no commit record timestamp in commit metadata; needs to fall back", "partition", lag.Partition, "commit_offset", lag.Commit.At, "metadata", lag.Commit.Metadata, "fallback_millis", fallbackMillis)
commitRecTs = fallbackMillis
}

level.Debug(logger).Log(
level.Info(logger).Log(
"msg", "creating partition state",
"partition", lag.Partition,
"start_offset", lag.Start.Offset,
"end_offset", lag.End.Offset,
"lag", lag.Lag,
"commit_rec_ts", commitRecTs,
"commit_rec_offset", lag.Commit.At,
"commit_offset", lag.Commit.At,
"last_seen_offset", lastSeenOffset,
"last_block_end_ts", lastBlockEndTs,
)

return partitionState{
Lag: lag.Lag,
Commit: lag.Commit,
CommitRecordTimestamp: time.UnixMilli(commitRecTs),
LastSeenOffset: lastSeenOffset,
Expand All @@ -289,41 +288,49 @@ func partitionStateFromLag(logger log.Logger, lag kadm.GroupMemberLag, fallbackM

// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, it takes care of consuming it in sections.
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEnd time.Time) (err error) {
func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, state partitionState, cycleEndTime time.Time, cycleEndOffset int64) (err error) {
builder := NewTSDBBuilder(b.logger, b.cfg.DataDir, b.cfg.BlocksStorage, b.limits, b.tsdbBuilderMetrics)
defer runutil.CloseWithErrCapture(&err, builder, "closing tsdb builder")

// Section is a portion of the partition to process in a single pass. One cycle may process multiple sections if the partition is lagging.
sectionEnd := cycleEnd
if sectionEnd.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) {
sectionEndTime := cycleEndTime
if sectionEndTime.Sub(state.CommitRecordTimestamp) > time.Duration(1.5*float64(b.cfg.ConsumeInterval)) {
// We are lagging behind by more than 1.5*interval or there is no commit. We need to consume the partition in sections.
// We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEnd,
// i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEnd)
// We iterate through all the ConsumeInterval intervals, starting from the first one after the last commit until the cycleEndTime,
// i.e. [T, T+interval), [T+interval, T+2*interval), ... [T+S*interval, cycleEndTime)
// where T is the CommitRecordTimestamp, the timestamp of the record, whose offset we committed previously.
// When there is no kafka commit, we play safe and assume LastSeenOffset, and LastBlockEnd were 0 to not discard any samples unnecessarily.
sectionEnd, _ = nextCycleEnd(
sectionEndTime, _ = nextCycleEnd(
state.CommitRecordTimestamp,
b.cfg.ConsumeInterval,
b.cfg.ConsumeIntervalBuffer,
)
level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "lag", state.Lag, "section_end", sectionEnd, "cycle_end", cycleEnd, "commit_rec_ts", state.CommitRecordTimestamp)
level.Info(b.logger).Log("msg", "partition is lagging behind the cycle", "partition", partition, "section_end", sectionEndTime, "cycle_end", cycleEndTime, "cycle_end_offset", cycleEndOffset, "commit_rec_ts", state.CommitRecordTimestamp)
}
for !sectionEnd.After(cycleEnd) {
partitionLogger := log.With(b.logger, "partition", partition, "lag", state.Lag, "section_end", sectionEnd)
state, err = b.consumePartitionSection(ctx, partitionLogger, builder, partition, state, sectionEnd)
for !sectionEndTime.After(cycleEndTime) {
logger := log.With(b.logger, "partition", partition, "section_end", sectionEndTime, "cycle_end_offset", cycleEndOffset)
state, err = b.consumePartitionSection(ctx, logger, builder, partition, state, sectionEndTime, cycleEndOffset)
if err != nil {
return fmt.Errorf("consume partition %d: %w", partition, err)
}
sectionEnd = sectionEnd.Add(b.cfg.ConsumeInterval)
sectionEndTime = sectionEndTime.Add(b.cfg.ConsumeInterval)
}

return nil
}

func (b *BlockBuilder) consumePartitionSection(ctx context.Context, logger log.Logger, builder *TSDBBuilder, partition int32, state partitionState, sectionEnd time.Time) (retState partitionState, retErr error) {
func (b *BlockBuilder) consumePartitionSection(
ctx context.Context,
logger log.Logger,
builder *TSDBBuilder,
partition int32,
state partitionState,
sectionEndTime time.Time,
cycleEndOffset int64,
) (retState partitionState, retErr error) {
// Oppose to the section's range (and cycle's range), that include the ConsumeIntervalBuffer, the block's range doesn't.
// Thus, truncate the timestamp with ConsumptionInterval here to round the block's range.
blockEnd := sectionEnd.Truncate(b.cfg.ConsumeInterval)
blockEnd := sectionEndTime.Truncate(b.cfg.ConsumeInterval)

var numBlocks int
defer func(t time.Time, startState partitionState) {
Expand All @@ -335,12 +342,12 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, logger log.L
dur := time.Since(t)

if retErr != nil {
level.Error(logger).Log("msg", "partition consumption failed", "err", retErr, "duration", dur, "curr_lag", retState.Lag)
level.Error(logger).Log("msg", "partition consumption failed", "err", retErr, "duration", dur)
return
}

b.blockBuilderMetrics.processPartitionDuration.WithLabelValues(fmt.Sprintf("%d", partition)).Observe(dur.Seconds())
level.Info(logger).Log("msg", "done consuming", "duration", dur, "curr_lag", retState.Lag,
level.Info(logger).Log("msg", "done consuming", "duration", dur,
"last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd,
"last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset,
"num_blocks", numBlocks)
Expand All @@ -366,15 +373,15 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, logger log.L
)

consumerLoop:
for remaining := state.Lag; remaining > 0; {
for recOffset := int64(-1); recOffset < cycleEndOffset-1; {
if err := context.Cause(ctx); err != nil {
return partitionState{}, err
}

// PollFetches can return a non-failed fetch with zero records. In such a case, with only the fetches at hands,
// we cannot tell if the consumer has already reached the latest end of the partition, i.e. no more records to consume,
// or there is more data in the backlog, and we must retry the poll. That's why the consumer loop above has to guard
// the iterations against the partitionState, so it retried the polling up until the expected end of the partition's reached.
// the iterations against the cycleEndOffset, so it retried the polling up until the expected end of the partition is reached.
fetches := b.kafkaClient.PollFetches(ctx)
fetches.EachError(func(_ string, _ int32, err error) {
if !errors.Is(err, context.Canceled) {
Expand All @@ -383,24 +390,17 @@ consumerLoop:
}
})

// If the partition had gaps in its offsets there is a potential edge-case here.
// In theory, the initial remaining value (i.e. the lag) can over-count how many records are available in partition
// to consume. Theoretically, if this is an inactive partition that doesn't receive new records, in the case of the gaps,
// the consumerLoop loop can end up into an infinite loop.
// TODO(v): Instead of counting the number of remaining records, we may rework it to use the partition end's offset
// from the time of lag calculation as the guard for the loop.
remaining -= int64(fetches.NumRecords())

for recIter := fetches.RecordIter(); !recIter.Done(); {
rec := recIter.Next()
recOffset = rec.Offset

if firstRec == nil {
firstRec = rec
}

// Stop consuming after we reached the sectionEnd marker.
// Stop consuming after we reached the sectionEndTime marker.
// NOTE: the timestamp of the record is when the record was produced relative to distributor's time.
if rec.Timestamp.After(sectionEnd) {
if rec.Timestamp.After(sectionEndTime) {
break consumerLoop
}

Expand Down Expand Up @@ -472,7 +472,6 @@ consumerLoop:
Metadata: marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastSeenOffset, lastBlockEnd.UnixMilli()),
}
newState := partitionState{
Lag: state.Lag - (commit.At - firstRec.Offset), // the new lag is the distance between fully processed offsets
Commit: commit,
CommitRecordTimestamp: commitRec.Timestamp,
LastSeenOffset: lastSeenOffset,
Expand Down
23 changes: 1 addition & 22 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,33 +614,14 @@ func TestPartitionStateFromLag(t *testing.T) {
wantState partitionState
}{
{
name: "no commit, no lag",
name: "no commit",
lag: kadm.GroupMemberLag{
Topic: testTopic,
Partition: 0,
Commit: kadm.Offset{},
Lag: 0,
},
fallbackMillis: testTime.UnixMilli(),
wantState: partitionState{
Lag: 0,
Commit: kadm.Offset{},
CommitRecordTimestamp: testTime,
LastSeenOffset: 0,
LastBlockEnd: time.UnixMilli(0),
},
},
{
name: "no commit, some lag",
lag: kadm.GroupMemberLag{
Topic: testTopic,
Partition: 0,
Commit: kadm.Offset{},
Lag: 10,
},
fallbackMillis: testTime.UnixMilli(),
wantState: partitionState{
Lag: 10,
Commit: kadm.Offset{},
CommitRecordTimestamp: testTime,
LastSeenOffset: 0,
Expand All @@ -653,11 +634,9 @@ func TestPartitionStateFromLag(t *testing.T) {
Topic: testTopic,
Partition: 0,
Commit: testKafkaOffset,
Lag: 10,
},
fallbackMillis: testTime.UnixMilli(),
wantState: partitionState{
Lag: 10,
Commit: testKafkaOffset,
CommitRecordTimestamp: commitRecTs,
LastSeenOffset: lastRecOffset,
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
alertbucketclient "github.com/grafana/mimir/pkg/alertmanager/alertstore/bucketclient"
alertstorelocal "github.com/grafana/mimir/pkg/alertmanager/alertstore/local"
"github.com/grafana/mimir/pkg/api"
"github.com/grafana/mimir/pkg/blockbuilder"
"github.com/grafana/mimir/pkg/compactor"
"github.com/grafana/mimir/pkg/continuoustest"
"github.com/grafana/mimir/pkg/distributor"
Expand Down Expand Up @@ -123,6 +124,7 @@ type Config struct {
Worker querier_worker.Config `yaml:"frontend_worker"`
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
IngestStorage ingest.Config `yaml:"ingest_storage"`
BlockBuilder blockbuilder.Config `yaml:"block_builder" doc:"hidden"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
Expand Down Expand Up @@ -181,6 +183,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.Worker.RegisterFlags(f)
c.Frontend.RegisterFlags(f, logger)
c.IngestStorage.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f, logger)
c.BlocksStorage.RegisterFlags(f)
c.Compactor.RegisterFlags(f, logger)
c.StoreGateway.RegisterFlags(f, logger)
Expand Down Expand Up @@ -730,6 +733,7 @@ type Mimir struct {
ActivityTracker *activitytracker.ActivityTracker
Vault *vault.Vault
UsageStatsReporter *usagestats.Reporter
BlockBuilder *blockbuilder.BlockBuilder
ContinuousTestManager *continuoustest.Manager
BuildInfoHandler http.Handler
}
Expand Down
Loading

0 comments on commit aa7f5b0

Please sign in to comment.