Skip to content

Commit

Permalink
kafka replay speed: move error handling closer to actual ingestion
Browse files Browse the repository at this point in the history
Previously, we'd let error bubble-up and only take decisions on whether to abort the request or not at the very top (`pusherConsumer`). This meant that we'd potentially buffer more requests before we detect an error.

This change extracts error handling logic into a `Pusher` implementation: `clientErrorFilteringPusher`. This implementation logs client errors and then swallows them. We inject that implementation in front of the ingester. This means that the parallel storage implementation can abort ASAP instead of collecting and bubbling up the errors.

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Sep 20, 2024
1 parent fb30f39 commit d32b522
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 47 deletions.
89 changes: 48 additions & 41 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ type PusherCloser interface {
// pusherConsumer receivers records from Kafka and pushes them to the storage.
// Each time a batch of records is received from Kafka, we instantiate a new pusherConsumer, this is to ensure we can retry if necessary and know whether we have completed that batch or not.
type pusherConsumer struct {
fallbackClientErrSampler *util_log.Sampler
metrics *pusherConsumerMetrics
logger log.Logger
metrics *pusherConsumerMetrics
logger log.Logger

kafkaConfig KafkaConfig

Expand Down Expand Up @@ -93,12 +92,15 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics

// newPusherConsumer creates a new pusherConsumer instance.
func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer {
// The layer below (parallelStoragePusher, parallelStorageShards, sequentialStoragePusher) will return all errors they see
// and potentially ingesting a batch if they encounter any error.
// We can safely ignore client errors and continue ingesting. We abort ingesting if we get any other error.
pusher = newClientErrorFilteringPusher(pusher, metrics, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), logger)
return &pusherConsumer{
pusher: pusher,
kafkaConfig: kafkaCfg,
metrics: metrics,
logger: logger,
fallbackClientErrSampler: util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate),
pusher: pusher,
kafkaConfig: kafkaCfg,
metrics: metrics,
logger: logger,
}
}

Expand Down Expand Up @@ -172,20 +174,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error {
cancel(cancellation.NewErrorf("done unmarshalling records"))

// We need to tell the storage writer that we're done and no more records are coming.
// err := c.close(ctx, writer)
spanLog := spanlogger.FromContext(ctx, log.NewNopLogger())
errs := writer.Close()
for eIdx := 0; eIdx < len(errs); eIdx++ {
err := errs[eIdx]
isServerErr := c.handlePushErr(ctx, "TODO", err, spanLog)
if !isServerErr {
errs[len(errs)-1], errs[eIdx] = errs[eIdx], errs[len(errs)-1]
errs = errs[:len(errs)-1]
eIdx--
}
}

return multierror.New(errs...).Err()
return multierror.New(writer.Close()...).Err()
}

func (c pusherConsumer) newStorageWriter() PusherCloser {
Expand All @@ -209,45 +198,68 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req
// TODO dimitarvdimitrov processing time is flawed because it's only counting enqueuing time, not processing time.
c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds())
c.metrics.totalRequests.Inc()
return err
}

// clientErrorFilteringPusher filters out client errors and logs them.
// It only returns errors that are not client errors.
type clientErrorFilteringPusher struct {
upstream Pusher
metrics *pusherConsumerMetrics
clientErrSampler *util_log.Sampler
fallbackLogger log.Logger
}

func newClientErrorFilteringPusher(upstream Pusher, metrics *pusherConsumerMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *clientErrorFilteringPusher {
return &clientErrorFilteringPusher{
upstream: upstream,
metrics: metrics,
clientErrSampler: clientErrSampler,
fallbackLogger: fallbackLogger,
}
}

isServerErr := c.handlePushErr(ctx, tenantID, err, spanLog)
if isServerErr {
func (p *clientErrorFilteringPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error {
err := p.upstream.PushToStorage(ctx, request)
if p.handlePushErr(ctx, err, spanlogger.FromContext(ctx, p.fallbackLogger)) {
return err
}

return nil
}

func (c pusherConsumer) handlePushErr(ctx context.Context, tenantID string, err error, spanLog *spanlogger.SpanLogger) bool {
func (p *clientErrorFilteringPusher) handlePushErr(ctx context.Context, err error, spanLog *spanlogger.SpanLogger) bool {
if err == nil {
return false
}
// Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly).
if !mimirpb.IsClientError(err) {
c.metrics.serverErrRequests.Inc()
p.metrics.serverErrRequests.Inc()
_ = spanLog.Error(err)
return true
}

c.metrics.clientErrRequests.Inc()
p.metrics.clientErrRequests.Inc()

// The error could be sampled or marked to be skipped in logs, so we check whether it should be
// logged before doing it.
if keep, reason := c.shouldLogClientError(ctx, err); keep {
if keep, reason := p.shouldLogClientError(ctx, err); keep {
if reason != "" {
err = fmt.Errorf("%w (%s)", err, reason)
}

// This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors.
level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "user", tenantID, "insight", true, "err", err)
level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err)
}
return false
}

// shouldLogClientError returns whether err should be logged.
func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) {
func (p *clientErrorFilteringPusher) shouldLogClientError(ctx context.Context, err error) (bool, string) {
var optional middleware.OptionalLogging
if !errors.As(err, &optional) {
// If error isn't sampled yet, we wrap it into our sampler and try again.
err = c.fallbackClientErrSampler.WrapError(err)
err = p.clientErrSampler.WrapError(err)
if !errors.As(err, &optional) {
// We can get here if c.clientErrSampler is nil.
return true, ""
Expand Down Expand Up @@ -383,11 +395,11 @@ func newParallelStorageShards(numTimeSeriesPerFlush prometheus.Histogram, numSha
}

// ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard.
// ShardWriteRequest aborts the request if it encounters an error.
func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error {
var (
builder labels.ScratchBuilder
nonCopiedLabels labels.Labels
errs multierror.MultiError
)

for _, ts := range request.Timeseries {
Expand All @@ -397,19 +409,14 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *
// TODO: Add metrics to measure how long are items sitting in the queue before they are flushed.
// TODO dimitarvdimitrov support metadata and the rest of the fields; perhaps cut a new request for different values of SkipLabelNameValidation?
if err := p.shards[shard].AddToBatch(ctx, ts); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
return err
}

errs.Add(err)
return fmt.Errorf("encountered a non-client error when ingesting shard; this error was for a previous write request for the same tenant: %w", err)
}
}

// We might some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming.
// Return whatever errors we have now, we'll call stop eventually and collect the rest.
return errs.Err()
// So far we didn't find any non-client errors that are worth aborting for.
// We'll call Close eventually and collect the rest.
return nil
}

// Stop stops all the shards and waits for them to finish.
Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func TestPusherConsumer(t *testing.T) {
expectedWRs: writeReqs[0:3],
expErr: "", // since all fof those were client errors, we don't return an error
expectedLogLines: []string{
"method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"",
"method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = Unknown desc = ingester test error\"",
"user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"",
"user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = Unknown desc = ingester test error\"",
},
},
"ingester server error": {
Expand All @@ -183,7 +183,7 @@ func TestPusherConsumer(t *testing.T) {
expectedWRs: writeReqs[0:2], // the rest of the requests are not attempted
expErr: "ingester internal error",
expectedLogLines: []string{
"method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"",
"user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"",
},
},
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func removeUnimportantLogFields(lines []string) []string {
return lines
}

func TestPusherConsumer_clientErrorSampling(t *testing.T) {
func TestClientErrorFilteringPusher_PushToStorage(t *testing.T) {
type testCase struct {
sampler *util_log.Sampler
err error
Expand Down Expand Up @@ -278,8 +278,7 @@ func TestPusherConsumer_clientErrorSampling(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
c := newPusherConsumer(nil, KafkaConfig{}, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), log.NewNopLogger())
c.fallbackClientErrSampler = tc.sampler
c := newClientErrorFilteringPusher(nil, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger())

sampled, reason := c.shouldLogClientError(context.Background(), tc.err)
assert.Equal(t, tc.expectedSampled, sampled)
Expand Down

0 comments on commit d32b522

Please sign in to comment.