From b5b5407b498a3ac334f3a599fccd09ebb678dec5 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Fri, 20 Sep 2024 12:54:25 +0200 Subject: [PATCH 1/5] kafka replay speed: move error handling closer to actual ingestion Previously, we'd let error bubble-up and only take decisions on whether to abort the request or not at the very top (`pusherConsumer`). This meant that we'd potentially buffer more requests before we detect an error. This change extracts error handling logic into a `Pusher` implementation: `clientErrorFilteringPusher`. This implementation logs client errors and then swallows them. We inject that implementation in front of the ingester. This means that the parallel storage implementation can abort ASAP instead of collecting and bubbling up the errors. Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/pusher.go | 89 +++++++++++++++++-------------- pkg/storage/ingest/pusher_test.go | 11 ++-- 2 files changed, 53 insertions(+), 47 deletions(-) diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index ec5ac38fd26..da0834616d0 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -42,9 +42,8 @@ type PusherCloser interface { // pusherConsumer receivers records from Kafka and pushes them to the storage. // Each time a batch of records is received from Kafka, we instantiate a new pusherConsumer, this is to ensure we can retry if necessary and know whether we have completed that batch or not. type pusherConsumer struct { - fallbackClientErrSampler *util_log.Sampler - metrics *pusherConsumerMetrics - logger log.Logger + metrics *pusherConsumerMetrics + logger log.Logger kafkaConfig KafkaConfig @@ -53,12 +52,15 @@ type pusherConsumer struct { // newPusherConsumer creates a new pusherConsumer instance. func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer { + // The layer below (parallelStoragePusher, parallelStorageShards, sequentialStoragePusher) will return all errors they see + // and potentially ingesting a batch if they encounter any error. + // We can safely ignore client errors and continue ingesting. We abort ingesting if we get any other error. + pusher = newClientErrorFilteringPusher(pusher, metrics, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), logger) return &pusherConsumer{ - pusher: pusher, - kafkaConfig: kafkaCfg, - metrics: metrics, - logger: logger, - fallbackClientErrSampler: util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), + pusher: pusher, + kafkaConfig: kafkaCfg, + metrics: metrics, + logger: logger, } } @@ -136,20 +138,7 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { cancel(cancellation.NewErrorf("done unmarshalling records")) // We need to tell the storage writer that we're done and no more records are coming. - // err := c.close(ctx, writer) - spanLog := spanlogger.FromContext(ctx, log.NewNopLogger()) - errs := writer.Close() - for eIdx := 0; eIdx < len(errs); eIdx++ { - err := errs[eIdx] - isServerErr := c.handlePushErr(ctx, "TODO", err, spanLog) - if !isServerErr { - errs[len(errs)-1], errs[eIdx] = errs[eIdx], errs[len(errs)-1] - errs = errs[:len(errs)-1] - eIdx-- - } - } - - return multierror.New(errs...).Err() + return multierror.New(writer.Close()...).Err() } func (c pusherConsumer) newStorageWriter() PusherCloser { @@ -169,45 +158,68 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req err := writer.PushToStorage(ctx, req) c.metrics.totalRequests.Inc() + return err +} - isServerErr := c.handlePushErr(ctx, tenantID, err, spanLog) - if isServerErr { +// clientErrorFilteringPusher filters out client errors and logs them. +// It only returns errors that are not client errors. +type clientErrorFilteringPusher struct { + upstream Pusher + metrics *pusherConsumerMetrics + clientErrSampler *util_log.Sampler + fallbackLogger log.Logger +} + +func newClientErrorFilteringPusher(upstream Pusher, metrics *pusherConsumerMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *clientErrorFilteringPusher { + return &clientErrorFilteringPusher{ + upstream: upstream, + metrics: metrics, + clientErrSampler: clientErrSampler, + fallbackLogger: fallbackLogger, + } +} + +func (p *clientErrorFilteringPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { + err := p.upstream.PushToStorage(ctx, request) + if p.handlePushErr(ctx, err, spanlogger.FromContext(ctx, p.fallbackLogger)) { return err } + return nil } -func (c pusherConsumer) handlePushErr(ctx context.Context, tenantID string, err error, spanLog *spanlogger.SpanLogger) bool { +func (p *clientErrorFilteringPusher) handlePushErr(ctx context.Context, err error, spanLog *spanlogger.SpanLogger) bool { if err == nil { return false } // Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly). if !mimirpb.IsClientError(err) { - c.metrics.serverErrRequests.Inc() + p.metrics.serverErrRequests.Inc() _ = spanLog.Error(err) return true } - c.metrics.clientErrRequests.Inc() + p.metrics.clientErrRequests.Inc() // The error could be sampled or marked to be skipped in logs, so we check whether it should be // logged before doing it. - if keep, reason := c.shouldLogClientError(ctx, err); keep { + if keep, reason := p.shouldLogClientError(ctx, err); keep { if reason != "" { err = fmt.Errorf("%w (%s)", err, reason) } + // This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors. - level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "user", tenantID, "insight", true, "err", err) + level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err) } return false } // shouldLogClientError returns whether err should be logged. -func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) { +func (p *clientErrorFilteringPusher) shouldLogClientError(ctx context.Context, err error) (bool, string) { var optional middleware.OptionalLogging if !errors.As(err, &optional) { // If error isn't sampled yet, we wrap it into our sampler and try again. - err = c.fallbackClientErrSampler.WrapError(err) + err = p.clientErrSampler.WrapError(err) if !errors.As(err, &optional) { // We can get here if c.clientErrSampler is nil. return true, "" @@ -352,11 +364,11 @@ func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batc // ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard. // ShardWriteRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. +// ShardWriteRequest aborts the request if it encounters an error. func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error { var ( builder labels.ScratchBuilder nonCopiedLabels labels.Labels - errs multierror.MultiError ) for _, ts := range request.Timeseries { @@ -364,13 +376,7 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards) if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil { - // TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error. - // We'll do that in the next PR as otherwise it's too many changes right now. - if !mimirpb.IsClientError(err) { - return err - } - - errs.Add(err) + return fmt.Errorf("encountered a non-client error when ingesting shard; this error was for a previous write request for the same tenant: %w", err) } } @@ -391,8 +397,9 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * } // We might some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming. - // Return whatever errors we have now, we'll call stop eventually and collect the rest. - return errs.Err() + // So far we didn't find any non-client errors that are worth aborting for. + // We'll call Close eventually and collect the rest. + return nil } // Stop stops all the shards and waits for them to finish. diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 3ccde7d3c41..4c192852af2 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -164,8 +164,8 @@ func TestPusherConsumer(t *testing.T) { expectedWRs: writeReqs[0:3], expErr: "", // since all fof those were client errors, we don't return an error expectedLogLines: []string{ - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = Unknown desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = Unknown desc = ingester test error\"", }, }, "ingester server error": { @@ -183,7 +183,7 @@ func TestPusherConsumer(t *testing.T) { expectedWRs: writeReqs[0:2], // the rest of the requests are not attempted expErr: "ingester internal error", expectedLogLines: []string{ - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", }, }, } @@ -239,7 +239,7 @@ func removeUnimportantLogFields(lines []string) []string { return lines } -func TestPusherConsumer_clientErrorSampling(t *testing.T) { +func TestClientErrorFilteringPusher_PushToStorage(t *testing.T) { type testCase struct { sampler *util_log.Sampler err error @@ -279,8 +279,7 @@ func TestPusherConsumer_clientErrorSampling(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - c := newPusherConsumer(nil, KafkaConfig{}, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), log.NewNopLogger()) - c.fallbackClientErrSampler = tc.sampler + c := newClientErrorFilteringPusher(nil, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) sampled, reason := c.shouldLogClientError(context.Background(), tc.err) assert.Equal(t, tc.expectedSampled, sampled) From 08ef695db6861dc3ea388ccfcd5a075c4204a7d4 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 25 Sep 2024 17:33:41 +0200 Subject: [PATCH 2/5] Move total requests tracking where errors are tracked Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/pusher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index da0834616d0..65f3f22aece 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -157,7 +157,6 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req ctx = user.InjectOrgID(ctx, tenantID) err := writer.PushToStorage(ctx, req) - c.metrics.totalRequests.Inc() return err } @@ -181,6 +180,7 @@ func newClientErrorFilteringPusher(upstream Pusher, metrics *pusherConsumerMetri func (p *clientErrorFilteringPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { err := p.upstream.PushToStorage(ctx, request) + p.metrics.totalRequests.Inc() if p.handlePushErr(ctx, err, spanlogger.FromContext(ctx, p.fallbackLogger)) { return err } From 1263e492f342ce75b50f99ea92ef320ca05d9f5c Mon Sep 17 00:00:00 2001 From: gotjosh Date: Wed, 25 Sep 2024 17:28:36 +0100 Subject: [PATCH 3/5] WIP Signed-off-by: gotjosh --- pkg/storage/ingest/pusher.go | 186 ++++++++++++++------------- pkg/storage/ingest/pusher_metrics.go | 29 ++--- pkg/storage/ingest/pusher_test.go | 2 +- 3 files changed, 113 insertions(+), 104 deletions(-) diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 65f3f22aece..90c74f3b0c6 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -55,7 +55,6 @@ func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsu // The layer below (parallelStoragePusher, parallelStorageShards, sequentialStoragePusher) will return all errors they see // and potentially ingesting a batch if they encounter any error. // We can safely ignore client errors and continue ingesting. We abort ingesting if we get any other error. - pusher = newClientErrorFilteringPusher(pusher, metrics, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), logger) return &pusherConsumer{ pusher: pusher, kafkaConfig: kafkaCfg, @@ -143,10 +142,17 @@ func (c pusherConsumer) Consume(ctx context.Context, records []record) error { func (c pusherConsumer) newStorageWriter() PusherCloser { if c.kafkaConfig.IngestionConcurrency == 0 { - return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher) + return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.FallbackClientErrorSampleRate, c.logger) } - return newParallelStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.IngestionConcurrency, c.kafkaConfig.IngestionConcurrencyBatchSize, c.logger) + return newParallelStoragePusher( + c.metrics.storagePusherMetrics, + c.pusher, + c.kafkaConfig.FallbackClientErrorSampleRate, + c.kafkaConfig.IngestionConcurrency, + c.kafkaConfig.IngestionConcurrencyBatchSize, + c.logger, + ) } func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error { @@ -155,92 +161,26 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req // Note that the implementation of the Pusher expects the tenantID to be in the context. ctx = user.InjectOrgID(ctx, tenantID) + err := writer.PushToStorage(ctx, req) return err } -// clientErrorFilteringPusher filters out client errors and logs them. -// It only returns errors that are not client errors. -type clientErrorFilteringPusher struct { - upstream Pusher - metrics *pusherConsumerMetrics - clientErrSampler *util_log.Sampler - fallbackLogger log.Logger -} - -func newClientErrorFilteringPusher(upstream Pusher, metrics *pusherConsumerMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *clientErrorFilteringPusher { - return &clientErrorFilteringPusher{ - upstream: upstream, - metrics: metrics, - clientErrSampler: clientErrSampler, - fallbackLogger: fallbackLogger, - } -} - -func (p *clientErrorFilteringPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { - err := p.upstream.PushToStorage(ctx, request) - p.metrics.totalRequests.Inc() - if p.handlePushErr(ctx, err, spanlogger.FromContext(ctx, p.fallbackLogger)) { - return err - } - - return nil -} - -func (p *clientErrorFilteringPusher) handlePushErr(ctx context.Context, err error, spanLog *spanlogger.SpanLogger) bool { - if err == nil { - return false - } - // Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly). - if !mimirpb.IsClientError(err) { - p.metrics.serverErrRequests.Inc() - _ = spanLog.Error(err) - return true - } - - p.metrics.clientErrRequests.Inc() - - // The error could be sampled or marked to be skipped in logs, so we check whether it should be - // logged before doing it. - if keep, reason := p.shouldLogClientError(ctx, err); keep { - if reason != "" { - err = fmt.Errorf("%w (%s)", err, reason) - } - - // This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors. - level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err) - } - return false -} - -// shouldLogClientError returns whether err should be logged. -func (p *clientErrorFilteringPusher) shouldLogClientError(ctx context.Context, err error) (bool, string) { - var optional middleware.OptionalLogging - if !errors.As(err, &optional) { - // If error isn't sampled yet, we wrap it into our sampler and try again. - err = p.clientErrSampler.WrapError(err) - if !errors.As(err, &optional) { - // We can get here if c.clientErrSampler is nil. - return true, "" - } - } - - return optional.ShouldLog(ctx) -} - // sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one. type sequentialStoragePusher struct { - metrics *storagePusherMetrics + metrics *storagePusherMetrics + errorHandler *pushErrorHandler pusher Pusher } // newSequentialStoragePusher creates a new sequentialStoragePusher instance. -func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher) sequentialStoragePusher { +func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, logger log.Logger) sequentialStoragePusher { return sequentialStoragePusher{ - metrics: metrics, - pusher: pusher, + metrics: metrics, + pusher: pusher, + errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger), } } @@ -251,7 +191,11 @@ func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirp ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds()) }(time.Now()) - return ssp.pusher.PushToStorage(ctx, wr) + if err := ssp.pusher.PushToStorage(ctx, wr); ssp.errorHandler.IsServerError(ctx, err) { + return err + } + + return nil } // Close implements the PusherCloser interface. @@ -268,17 +212,19 @@ type parallelStoragePusher struct { // pushers is map["$tenant|$source"]*parallelStorageShards pushers map[string]*parallelStorageShards upstreamPusher Pusher + errorHandler *pushErrorHandler numShards int batchSize int } // newParallelStoragePusher creates a new parallelStoragePusher instance. -func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { +func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { return ¶llelStoragePusher{ logger: log.With(logger, "component", "parallel-storage-pusher"), pushers: make(map[string]*parallelStorageShards), upstreamPusher: pusher, numShards: numShards, + errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger), batchSize: batchSize, metrics: metrics, } @@ -325,7 +271,8 @@ type labelsHashFunc func(labels.Labels) uint64 // parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series. // Each series is hashed to a shard that contains its own batchingQueue. type parallelStorageShards struct { - metrics *storagePusherMetrics + metrics *storagePusherMetrics + errorHandler *pushErrorHandler pusher Pusher hashLabels labelsHashFunc @@ -376,7 +323,7 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards) if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil { - return fmt.Errorf("encountered a non-client error when ingesting shard; this error was for a previous write request for the same tenant: %w", err) + return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) } } @@ -384,13 +331,7 @@ func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request * shard := 0 for mdIdx := range request.Metadata { if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil { - // TODO: Technically, we should determine at this point what type of error it is and abort the whole push if it's a server error. - // We'll do that in the next PR as otherwise it's too many changes right now. - if !mimirpb.IsClientError(err) { - return err - } - - errs.Add(err) + return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) } shard++ shard %= p.numShards @@ -440,8 +381,10 @@ func (p *parallelStorageShards) run(queue *batchingQueue) { err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest) + // The error handler needs to determine if this is a server error or not. + // If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing. p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds()) - if err != nil { + if err != nil && p.errorHandler.IsServerError(wr.Context, err) { queue.ErrorChannel() <- err } } @@ -461,6 +404,73 @@ func requestContents(request *mimirpb.WriteRequest) string { } } +// pushErrorHandler filters out client errors and logs them. +// It only returns errors that are not client errors. +type pushErrorHandler struct { + metrics *storagePusherMetrics + clientErrSampler *util_log.Sampler + fallbackLogger log.Logger +} + +// newPushErrorHandler creates a new pushErrorHandler instance. +func newPushErrorHandler(metrics *storagePusherMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *pushErrorHandler { + return &pushErrorHandler{ + metrics: metrics, + clientErrSampler: clientErrSampler, + fallbackLogger: fallbackLogger, + } +} + +// IsServerError returns whether the error is a server error or not, the context is used to extract the span from the trace. +// When the error is a server error, we'll add it to the span passed down in the context and return true to indicate that the we should stop processing. +// When it is a client error, we'll add it to the span and log it to stdout/stderr. +func (p *pushErrorHandler) IsServerError(ctx context.Context, err error) bool { + // For every request, we have to determine if it's a server error. + // For the sake of simplicity, let's increment the total requests counter here. + p.metrics.totalRequests.Inc() + + spanLog := spanlogger.FromContext(ctx, p.fallbackLogger) + if err == nil { + return false + } + + // Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly). + if !mimirpb.IsClientError(err) { + p.metrics.serverErrRequests.Inc() + _ = spanLog.Error(err) + return true + } + + p.metrics.clientErrRequests.Inc() + + // The error could be sampled or marked to be skipped in logs, so we check whether it should be + // logged before doing it. + if keep, reason := p.shouldLogClientError(ctx, err); keep { + if reason != "" { + err = fmt.Errorf("%w (%s)", err, reason) + } + + // This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors. + level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err) + } + return false +} + +// shouldLogClientError returns whether err should be logged. +func (p *pushErrorHandler) shouldLogClientError(ctx context.Context, err error) (bool, string) { + var optional middleware.OptionalLogging + if !errors.As(err, &optional) { + // If error isn't sampled yet, we wrap it into our sampler and try again. + err = p.clientErrSampler.WrapError(err) + if !errors.As(err, &optional) { + // We can get here if c.clientErrSampler is nil. + return true, "" + } + } + + return optional.ShouldLog(ctx) +} + // batchingQueue is a queue that batches the incoming time series according to the batch size. // Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method. type batchingQueue struct { diff --git a/pkg/storage/ingest/pusher_metrics.go b/pkg/storage/ingest/pusher_metrics.go index d8bc231521c..8b5b3d1ff07 100644 --- a/pkg/storage/ingest/pusher_metrics.go +++ b/pkg/storage/ingest/pusher_metrics.go @@ -12,20 +12,12 @@ import ( // pusherConsumerMetrics holds the metrics for the pusherConsumer. type pusherConsumerMetrics struct { processingTimeSeconds prometheus.Observer - clientErrRequests prometheus.Counter - serverErrRequests prometheus.Counter - totalRequests prometheus.Counter storagePusherMetrics *storagePusherMetrics } // newPusherConsumerMetrics creates a new pusherConsumerMetrics instance. func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics { - errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_failed_total", - Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", - }, []string{"cause"}) - return &pusherConsumerMetrics{ storagePusherMetrics: newStoragePusherMetrics(reg), processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -36,13 +28,6 @@ func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics NativeHistogramMinResetDuration: 1 * time.Hour, Buckets: prometheus.DefBuckets, }), - - clientErrRequests: errRequestsCounter.WithLabelValues("client"), - serverErrRequests: errRequestsCounter.WithLabelValues("server"), - totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_total", - Help: "Number of attempted records (write requests).", - }), } } @@ -53,10 +38,18 @@ type storagePusherMetrics struct { processingTime *prometheus.HistogramVec timeSeriesPerFlush prometheus.Histogram batchingQueueMetrics *batchingQueueMetrics + clientErrRequests prometheus.Counter + serverErrRequests prometheus.Counter + totalRequests prometheus.Counter } // newStoragePusherMetrics creates a new storagePusherMetrics instance. func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics { + errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_failed_total", + Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", + }, []string{"cause"}) + return &storagePusherMetrics{ batchingQueueMetrics: newBatchingQueueMetrics(reg), batchAge: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ @@ -74,6 +67,12 @@ func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics { Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than -ingest-storage.kafka.ingestion-concurrency-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.", NativeHistogramBucketFactor: 1.1, }), + clientErrRequests: errRequestsCounter.WithLabelValues("client"), + serverErrRequests: errRequestsCounter.WithLabelValues("server"), + totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_total", + Help: "Number of attempted records (write requests).", + }), } } diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 4c192852af2..8dd2d37447e 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -279,7 +279,7 @@ func TestClientErrorFilteringPusher_PushToStorage(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - c := newClientErrorFilteringPusher(nil, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) + c := newPushErrorHandler(nil, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) sampled, reason := c.shouldLogClientError(context.Background(), tc.err) assert.Equal(t, tc.expectedSampled, sampled) From eafc5a3bf5a87156c4a698982d1c43389a989ce5 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 25 Sep 2024 18:42:35 +0200 Subject: [PATCH 4/5] Use pushErrorHandler in parallelStorageShards Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/pusher.go | 19 ++++++++++--------- pkg/storage/ingest/pusher_test.go | 7 ++++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 90c74f3b0c6..7e2dc698c6e 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -261,7 +261,7 @@ func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.Wr } // Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes. hashLabels := labels.Labels.Hash - p := newParallelStorageShards(c.metrics, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) + p := newParallelStorageShards(c.metrics, c.errorHandler, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) c.pushers[userID+"|"+requestSource.String()] = p return p } @@ -293,15 +293,16 @@ type flushableWriteRequest struct { } // newParallelStorageShards creates a new parallelStorageShards instance. -func newParallelStorageShards(metrics *storagePusherMetrics, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { +func newParallelStorageShards(metrics *storagePusherMetrics, errorHandler *pushErrorHandler, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { p := ¶llelStorageShards{ - numShards: numShards, - pusher: pusher, - hashLabels: hashLabels, - capacity: capacity, - metrics: metrics, - batchSize: batchSize, - wg: &sync.WaitGroup{}, + numShards: numShards, + pusher: pusher, + errorHandler: errorHandler, + hashLabels: hashLabels, + capacity: capacity, + metrics: metrics, + batchSize: batchSize, + wg: &sync.WaitGroup{}, } p.start() diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 8dd2d37447e..e3db42303f5 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -279,7 +279,7 @@ func TestClientErrorFilteringPusher_PushToStorage(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - c := newPushErrorHandler(nil, newPusherConsumerMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) + c := newPushErrorHandler(newStoragePusherMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) sampled, reason := c.shouldLogClientError(context.Background(), tc.err) assert.Equal(t, tc.expectedSampled, sampled) @@ -667,7 +667,8 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { // run with a buffer of one, so some of the tests can fill the buffer and test the error handling const buffer = 1 metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) - shardingP := newParallelStorageShards(metrics, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) + errorHandler := newPushErrorHandler(metrics, nil, log.NewNopLogger()) + shardingP := newParallelStorageShards(metrics, errorHandler, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) for i, req := range tc.expectedUpstreamPushes { pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i]) @@ -830,7 +831,7 @@ func TestParallelStoragePusher(t *testing.T) { }).Return(nil) metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) - psp := newParallelStoragePusher(metrics, pusher, 1, 1, logger) + psp := newParallelStoragePusher(metrics, pusher, 0, 1, 1, logger) // Process requests for _, req := range tc.requests { From 90eb89440ab6fcc0a81864a7790c8daa642f4bb4 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 25 Sep 2024 18:44:58 +0200 Subject: [PATCH 5/5] Rename test Signed-off-by: Dimitar Dimitrov --- pkg/storage/ingest/pusher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index e3db42303f5..66820de2484 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -239,7 +239,7 @@ func removeUnimportantLogFields(lines []string) []string { return lines } -func TestClientErrorFilteringPusher_PushToStorage(t *testing.T) { +func TestPushErrorHandler_IsServerError(t *testing.T) { type testCase struct { sampler *util_log.Sampler err error