Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: move error handling closer to actual ingestion #9349

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 123 additions & 105 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ type PusherCloser interface {
// pusherConsumer receivers records from Kafka and pushes them to the storage.
// Each time a batch of records is received from Kafka, we instantiate a new pusherConsumer, this is to ensure we can retry if necessary and know whether we have completed that batch or not.
type pusherConsumer struct {
fallbackClientErrSampler *util_log.Sampler
metrics *pusherConsumerMetrics
logger log.Logger
metrics *pusherConsumerMetrics
logger log.Logger

kafkaConfig KafkaConfig

Expand All @@ -53,12 +52,14 @@ type pusherConsumer struct {

// newPusherConsumer creates a new pusherConsumer instance.
func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer {
// The layer below (parallelStoragePusher, parallelStorageShards, sequentialStoragePusher) will return all errors they see
// and potentially ingesting a batch if they encounter any error.
// We can safely ignore client errors and continue ingesting. We abort ingesting if we get any other error.
return &pusherConsumer{
pusher: pusher,
kafkaConfig: kafkaCfg,
metrics: metrics,
logger: logger,
fallbackClientErrSampler: util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate),
pusher: pusher,
kafkaConfig: kafkaCfg,
metrics: metrics,
logger: logger,
}
}

Expand Down Expand Up @@ -136,28 +137,22 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error {
cancel(cancellation.NewErrorf("done unmarshalling records"))

// We need to tell the storage writer that we're done and no more records are coming.
// err := c.close(ctx, writer)
spanLog := spanlogger.FromContext(ctx, log.NewNopLogger())
errs := writer.Close()
for eIdx := 0; eIdx < len(errs); eIdx++ {
err := errs[eIdx]
isServerErr := c.handlePushErr(ctx, "TODO", err, spanLog)
if !isServerErr {
errs[len(errs)-1], errs[eIdx] = errs[eIdx], errs[len(errs)-1]
errs = errs[:len(errs)-1]
eIdx--
}
}

return multierror.New(errs...).Err()
return multierror.New(writer.Close()...).Err()
}

func (c pusherConsumer) newStorageWriter() PusherCloser {
if c.kafkaConfig.IngestionConcurrency == 0 {
return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher)
return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.FallbackClientErrorSampleRate, c.logger)
}

return newParallelStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger)
return newParallelStoragePusher(
c.metrics.storagePusherMetrics,
c.pusher,
c.kafkaConfig.FallbackClientErrorSampleRate,
c.kafkaConfig.IngestionConcurrency,
c.kafkaConfig.IngestionConcurrencyBatchSize,
c.logger,
)
}

func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error {
Expand All @@ -166,69 +161,26 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req

// Note that the implementation of the Pusher expects the tenantID to be in the context.
ctx = user.InjectOrgID(ctx, tenantID)
err := writer.PushToStorage(ctx, req)

c.metrics.totalRequests.Inc()

isServerErr := c.handlePushErr(ctx, tenantID, err, spanLog)
if isServerErr {
return err
}
return nil
}

func (c pusherConsumer) handlePushErr(ctx context.Context, tenantID string, err error, spanLog *spanlogger.SpanLogger) bool {
if err == nil {
return false
}
// Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly).
if !mimirpb.IsClientError(err) {
c.metrics.serverErrRequests.Inc()
_ = spanLog.Error(err)
return true
}

c.metrics.clientErrRequests.Inc()

// The error could be sampled or marked to be skipped in logs, so we check whether it should be
// logged before doing it.
if keep, reason := c.shouldLogClientError(ctx, err); keep {
if reason != "" {
err = fmt.Errorf("%w (%s)", err, reason)
}
// This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors.
level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "user", tenantID, "insight", true, "err", err)
}
return false
}

// shouldLogClientError returns whether err should be logged.
func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) {
var optional middleware.OptionalLogging
if !errors.As(err, &optional) {
// If error isn't sampled yet, we wrap it into our sampler and try again.
err = c.fallbackClientErrSampler.WrapError(err)
if !errors.As(err, &optional) {
// We can get here if c.clientErrSampler is nil.
return true, ""
}
}
err := writer.PushToStorage(ctx, req)

return optional.ShouldLog(ctx)
return err
}

// sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one.
type sequentialStoragePusher struct {
metrics *storagePusherMetrics
metrics *storagePusherMetrics
errorHandler *pushErrorHandler

pusher Pusher
}

// newSequentialStoragePusher creates a new sequentialStoragePusher instance.
func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher) sequentialStoragePusher {
func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, logger log.Logger) sequentialStoragePusher {
return sequentialStoragePusher{
metrics: metrics,
pusher: pusher,
metrics: metrics,
pusher: pusher,
errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger),
}
}

Expand All @@ -239,7 +191,11 @@ func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirp
ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds())
}(time.Now())

return ssp.pusher.PushToStorage(ctx, wr)
if err := ssp.pusher.PushToStorage(ctx, wr); ssp.errorHandler.IsServerError(ctx, err) {
return err
}

return nil
}

// Close implements the PusherCloser interface.
Expand All @@ -256,17 +212,19 @@ type parallelStoragePusher struct {
// pushers is map["$tenant|$source"]*parallelStorageShards
pushers map[string]*parallelStorageShards
upstreamPusher Pusher
errorHandler *pushErrorHandler
numShards int
batchSize int
}

// newParallelStoragePusher creates a new parallelStoragePusher instance.
func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher {
func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher {
return &parallelStoragePusher{
logger: log.With(logger, "component", "parallel-storage-pusher"),
pushers: make(map[string]*parallelStorageShards),
upstreamPusher: pusher,
numShards: numShards,
errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger),
batchSize: batchSize,
metrics: metrics,
}
Expand Down Expand Up @@ -303,7 +261,7 @@ func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.Wr
}
// Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes.
hashLabels := labels.Labels.Hash
p := newParallelStorageShards(c.metrics, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels)
p := newParallelStorageShards(c.metrics, c.errorHandler, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels)
c.pushers[userID+"|"+requestSource.String()] = p
return p
}
Expand All @@ -313,7 +271,8 @@ type labelsHashFunc func(labels.Labels) uint64
// parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series.
// Each series is hashed to a shard that contains its own batchingQueue.
type parallelStorageShards struct {
metrics *storagePusherMetrics
metrics *storagePusherMetrics
errorHandler *pushErrorHandler

pusher Pusher
hashLabels labelsHashFunc
Expand All @@ -334,15 +293,16 @@ type flushableWriteRequest struct {
}

// newParallelStorageShards creates a new parallelStorageShards instance.
func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards {
func newParallelStorageShards(metrics *storagePusherMetrics, errorHandler *pushErrorHandler, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards {
p := &parallelStorageShards{
numShards: numShards,
pusher: pusher,
hashLabels: hashLabels,
capacity: capacity,
metrics: metrics,
batchSize: batchSize,
wg: &sync.WaitGroup{},
numShards: numShards,
pusher: pusher,
errorHandler: errorHandler,
hashLabels: hashLabels,
capacity: capacity,
metrics: metrics,
batchSize: batchSize,
wg: &sync.WaitGroup{},
}

p.start()
Expand All @@ -352,47 +312,36 @@ func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batc

// ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard.
// ShardWriteRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester.
// ShardWriteRequest aborts the request if it encounters an error.
func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error {
var (
builder labels.ScratchBuilder
nonCopiedLabels labels.Labels
errs multierror.MultiError
)

for _, ts := range request.Timeseries {
mimirpb.FromLabelAdaptersOverwriteLabels(&builder, ts.Labels, &nonCopiedLabels)
shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards)

if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
return err
}

errs.Add(err)
return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err)
}
}

// Push metadata to every shard in a round-robin fashion.
shard := 0
for mdIdx := range request.Metadata {
if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil {
// TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error.
// We'll do that in the next PR as otherwise it's too many changes right now.
if !mimirpb.IsClientError(err) {
return err
}

errs.Add(err)
return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err)
}
shard++
shard %= p.numShards
}

// We might some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming.
// Return whatever errors we have now, we'll call stop eventually and collect the rest.
return errs.Err()
// So far we didn't find any non-client errors that are worth aborting for.
// We'll call Close eventually and collect the rest.
return nil
}

// Stop stops all the shards and waits for them to finish.
Expand Down Expand Up @@ -433,8 +382,10 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {

err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest)

// The error handler needs to determine if this is a server error or not.
// If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing.
p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())
if err != nil {
if err != nil && p.errorHandler.IsServerError(wr.Context, err) {
queue.ErrorChannel() <- err
}
}
Expand All @@ -454,6 +405,73 @@ func requestContents(request *mimirpb.WriteRequest) string {
}
}

// pushErrorHandler filters out client errors and logs them.
// It only returns errors that are not client errors.
type pushErrorHandler struct {
metrics *storagePusherMetrics
clientErrSampler *util_log.Sampler
fallbackLogger log.Logger
}

// newPushErrorHandler creates a new pushErrorHandler instance.
func newPushErrorHandler(metrics *storagePusherMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *pushErrorHandler {
return &pushErrorHandler{
metrics: metrics,
clientErrSampler: clientErrSampler,
fallbackLogger: fallbackLogger,
}
}

// IsServerError returns whether the error is a server error or not, the context is used to extract the span from the trace.
// When the error is a server error, we'll add it to the span passed down in the context and return true to indicate that the we should stop processing.
// When it is a client error, we'll add it to the span and log it to stdout/stderr.
func (p *pushErrorHandler) IsServerError(ctx context.Context, err error) bool {
// For every request, we have to determine if it's a server error.
// For the sake of simplicity, let's increment the total requests counter here.
p.metrics.totalRequests.Inc()

spanLog := spanlogger.FromContext(ctx, p.fallbackLogger)
if err == nil {
return false
}

// Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly).
if !mimirpb.IsClientError(err) {
p.metrics.serverErrRequests.Inc()
_ = spanLog.Error(err)
return true
}

p.metrics.clientErrRequests.Inc()

// The error could be sampled or marked to be skipped in logs, so we check whether it should be
// logged before doing it.
if keep, reason := p.shouldLogClientError(ctx, err); keep {
if reason != "" {
err = fmt.Errorf("%w (%s)", err, reason)
}

// This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors.
level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err)
}
return false
}

// shouldLogClientError returns whether err should be logged.
func (p *pushErrorHandler) shouldLogClientError(ctx context.Context, err error) (bool, string) {
var optional middleware.OptionalLogging
if !errors.As(err, &optional) {
// If error isn't sampled yet, we wrap it into our sampler and try again.
err = p.clientErrSampler.WrapError(err)
if !errors.As(err, &optional) {
// We can get here if c.clientErrSampler is nil.
return true, ""
}
}

return optional.ShouldLog(ctx)
}

// batchingQueue is a queue that batches the incoming time series according to the batch size.
// Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method.
type batchingQueue struct {
Expand Down
Loading
Loading