From eab459c3a14eec0f0f65506b6536054b89fcb953 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20=C5=A0tibran=C3=BD?= Date: Thu, 30 May 2024 11:36:43 +0200 Subject: [PATCH] Ingest-storage: Add fallback sampler for client errors (#8216) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Ingest-storage now uses fallback sampler when logging client errors. Signed-off-by: Peter Štibraný * Improve comment. Signed-off-by: Peter Štibraný * Use fallback prefix. Signed-off-by: Peter Štibraný --------- Signed-off-by: Peter Štibraný --- pkg/ingester/ingester.go | 1 + pkg/storage/ingest/config.go | 3 ++ pkg/storage/ingest/pusher.go | 31 ++++++++++++++--- pkg/storage/ingest/pusher_test.go | 55 +++++++++++++++++++++++++++++-- pkg/storage/ingest/reader.go | 3 +- pkg/storage/ingest/util.go | 12 ------- pkg/util/log/sampler_test.go | 9 +++++ 7 files changed, 93 insertions(+), 21 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index bb3185d8c50..240bee2cc6d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -444,6 +444,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, // We use the ingester instance ID as consumer group. This means that we have N consumer groups // where N is the total number of ingesters. Each ingester is part of their own consumer group // so that they all replay the owned partition with no gaps. + kafkaCfg.FallbackClientErrorSampleRate = cfg.ErrorSampleRate i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, i.ingestPartitionID, cfg.IngesterRing.InstanceID, i, log.With(logger, "component", "ingest_reader"), registerer) if err != nil { return nil, errors.Wrap(err, "creating ingest storage reader") diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 63cd9124b32..743ee0d35f5 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -76,6 +76,9 @@ type KafkaConfig struct { AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` + + // Used when logging unsampled client errors. Set from ingester's ErrorSampleRate. + FallbackClientErrorSampleRate int64 `yaml:"-"` } func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) { diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 0b0bdc0fc43..6dee28b63c9 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -4,17 +4,20 @@ package ingest import ( "context" + "errors" "fmt" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/cancellation" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/user" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/mimir/pkg/mimirpb" + util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -29,7 +32,9 @@ type pusherConsumer struct { clientErrRequests prometheus.Counter serverErrRequests prometheus.Counter totalRequests prometheus.Counter - logger log.Logger + + fallbackClientErrSampler *util_log.Sampler // Fallback log message sampler client errors that are not sampled yet. + logger log.Logger } type parsedRecord struct { @@ -40,15 +45,16 @@ type parsedRecord struct { err error } -func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pusherConsumer { +func newPusherConsumer(p Pusher, fallbackClientErrSampler *util_log.Sampler, reg prometheus.Registerer, l log.Logger) *pusherConsumer { errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingest_storage_reader_records_failed_total", Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", }, []string{"cause"}) return &pusherConsumer{ - pusher: p, - logger: l, + pusher: p, + logger: l, + fallbackClientErrSampler: fallbackClientErrSampler, processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_reader_processing_time_seconds", Help: "Time taken to process a single record (write request).", @@ -118,7 +124,7 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req // The error could be sampled or marked to be skipped in logs, so we check whether it should be // logged before doing it. - if keep, reason := shouldLog(ctx, err); keep { + if keep, reason := c.shouldLogClientError(ctx, err); keep { if reason != "" { err = fmt.Errorf("%w (%s)", err, reason) } @@ -128,6 +134,21 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req return nil } +// shouldLogClientError returns whether err should be logged. +func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) { + var optional middleware.OptionalLogging + if !errors.As(err, &optional) { + // If error isn't sampled yet, we wrap it into our sampler and try again. + err = c.fallbackClientErrSampler.WrapError(err) + if !errors.As(err, &optional) { + // We can get here if c.clientErrSampler is nil. + return true, "" + } + } + + return optional.ShouldLog(ctx) +} + // The passed context is expected to be cancelled after all items in records were fully processed and are ready // to be released. This so to guaranty the release of resources associated with each parsedRecord context. func (c pusherConsumer) unmarshalRequests(ctx context.Context, records []record, recC chan<- parsedRecord) { diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index c4444218343..822243b3d28 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -200,7 +200,7 @@ func TestPusherConsumer(t *testing.T) { }) logs := &concurrency.SyncBuffer{} - c := newPusherConsumer(pusher, prometheus.NewPedanticRegistry(), log.NewLogfmtLogger(logs)) + c := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewLogfmtLogger(logs)) err := c.consume(context.Background(), tc.records) if tc.expErr == "" { assert.NoError(t, err) @@ -217,6 +217,55 @@ func TestPusherConsumer(t *testing.T) { } } +func TestPusherConsumer_clientErrorSampling(t *testing.T) { + type testCase struct { + sampler *util_log.Sampler + err error + expectedSampled bool + expectedReason string + } + + plainError := fmt.Errorf("plain") + + for name, tc := range map[string]testCase{ + "nil sampler, plain error": { + sampler: nil, + err: plainError, + expectedSampled: true, + expectedReason: "", + }, + + "nil sampler, sampled error": { + sampler: nil, + err: util_log.NewSampler(20).WrapError(plainError), // need to use new sampler to make sure it samples the error + expectedSampled: true, + expectedReason: "sampled 1/20", + }, + + "fallback sampler, plain error": { + sampler: util_log.NewSampler(5), + err: plainError, + expectedSampled: true, + expectedReason: "sampled 1/5", + }, + + "fallback sampler, sampled error": { + sampler: util_log.NewSampler(5), + err: util_log.NewSampler(20).WrapError(plainError), // need to use new sampler to make sure it samples the error + expectedSampled: true, + expectedReason: "sampled 1/20", + }, + } { + t.Run(name, func(t *testing.T) { + c := newPusherConsumer(nil, tc.sampler, prometheus.NewPedanticRegistry(), log.NewNopLogger()) + + sampled, reason := c.shouldLogClientError(context.Background(), tc.err) + assert.Equal(t, tc.expectedSampled, sampled) + assert.Equal(t, tc.expectedReason, reason) + }) + } +} + func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testing.T) { // Create a request that will be used in this test. The content doesn't matter, // since we only test errors. @@ -237,7 +286,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin reg := prometheus.NewPedanticRegistry() logs := &concurrency.SyncBuffer{} - consumer := newPusherConsumer(pusher, reg, log.NewLogfmtLogger(logs)) + consumer := newPusherConsumer(pusher, nil, reg, log.NewLogfmtLogger(logs)) return consumer, logs, reg } @@ -323,7 +372,7 @@ func TestPusherConsumer_consume_ShouldHonorContextCancellation(t *testing.T) { <-ctx.Done() return context.Cause(ctx) }) - consumer := newPusherConsumer(pusher, prometheus.NewPedanticRegistry(), log.NewNopLogger()) + consumer := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewNopLogger()) wantCancelErr := cancellation.NewErrorf("stop") diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 9eb02efd166..1ffac26fa20 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -23,6 +23,7 @@ import ( "github.com/twmb/franz-go/plugin/kprom" "go.uber.org/atomic" + util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -71,7 +72,7 @@ type PartitionReader struct { } func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { - consumer := newPusherConsumer(pusher, reg, logger) + consumer := newPusherConsumer(pusher, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), reg, logger) return newPartitionReader(kafkaCfg, partitionID, instanceID, consumer, logger, reg) } diff --git a/pkg/storage/ingest/util.go b/pkg/storage/ingest/util.go index fe56c0df8b2..87d3faffc48 100644 --- a/pkg/storage/ingest/util.go +++ b/pkg/storage/ingest/util.go @@ -10,9 +10,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/middleware" "github.com/grafana/regexp" - "github.com/pkg/errors" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" @@ -131,16 +129,6 @@ func (w *resultPromise[T]) wait(ctx context.Context) (T, error) { } } -// shouldLog returns whether err should be logged. -func shouldLog(ctx context.Context, err error) (bool, string) { - var optional middleware.OptionalLogging - if !errors.As(err, &optional) { - return true, "" - } - - return optional.ShouldLog(ctx) -} - // setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers. // This is best-effort, if setting the option fails, error is logged, but not returned. func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg KafkaConfig, logger log.Logger) { diff --git a/pkg/util/log/sampler_test.go b/pkg/util/log/sampler_test.go index 6ed42b6f870..8e0334a6158 100644 --- a/pkg/util/log/sampler_test.go +++ b/pkg/util/log/sampler_test.go @@ -62,3 +62,12 @@ func TestSampledError_ShouldImplementOptionalLoggingInterface(t *testing.T) { var optionalLoggingErr middleware.OptionalLogging require.ErrorAs(t, sampledErr, &optionalLoggingErr) } + +func TestNilSampler(t *testing.T) { + var s *Sampler + err := fmt.Errorf("error") + + sampledErr := s.WrapError(err) + require.NotNil(t, sampledErr) + require.Equal(t, err, sampledErr) +}