Skip to content

Commit

Permalink
Ingest-storage: Add fallback sampler for client errors (#8216)
Browse files Browse the repository at this point in the history
* Ingest-storage now uses fallback sampler when logging client errors.

Signed-off-by: Peter Štibraný <[email protected]>

* Improve comment.

Signed-off-by: Peter Štibraný <[email protected]>

* Use fallback prefix.

Signed-off-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored May 30, 2024
1 parent 236d9a7 commit eab459c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing,
// We use the ingester instance ID as consumer group. This means that we have N consumer groups
// where N is the total number of ingesters. Each ingester is part of their own consumer group
// so that they all replay the owned partition with no gaps.
kafkaCfg.FallbackClientErrorSampleRate = cfg.ErrorSampleRate
i.ingestReader, err = ingest.NewPartitionReaderForPusher(kafkaCfg, i.ingestPartitionID, cfg.IngesterRing.InstanceID, i, log.With(logger, "component", "ingest_reader"), registerer)
if err != nil {
return nil, errors.Wrap(err, "creating ingest storage reader")
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type KafkaConfig struct {

AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"`

// Used when logging unsampled client errors. Set from ingester's ErrorSampleRate.
FallbackClientErrorSampleRate int64 `yaml:"-"`
}

func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) {
Expand Down
31 changes: 26 additions & 5 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ package ingest

import (
"context"
"errors"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cancellation"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/mimir/pkg/mimirpb"
util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand All @@ -29,7 +32,9 @@ type pusherConsumer struct {
clientErrRequests prometheus.Counter
serverErrRequests prometheus.Counter
totalRequests prometheus.Counter
logger log.Logger

fallbackClientErrSampler *util_log.Sampler // Fallback log message sampler client errors that are not sampled yet.
logger log.Logger
}

type parsedRecord struct {
Expand All @@ -40,15 +45,16 @@ type parsedRecord struct {
err error
}

func newPusherConsumer(p Pusher, reg prometheus.Registerer, l log.Logger) *pusherConsumer {
func newPusherConsumer(p Pusher, fallbackClientErrSampler *util_log.Sampler, reg prometheus.Registerer, l log.Logger) *pusherConsumer {
errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_failed_total",
Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
}, []string{"cause"})

return &pusherConsumer{
pusher: p,
logger: l,
pusher: p,
logger: l,
fallbackClientErrSampler: fallbackClientErrSampler,
processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_reader_processing_time_seconds",
Help: "Time taken to process a single record (write request).",
Expand Down Expand Up @@ -118,7 +124,7 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req

// The error could be sampled or marked to be skipped in logs, so we check whether it should be
// logged before doing it.
if keep, reason := shouldLog(ctx, err); keep {
if keep, reason := c.shouldLogClientError(ctx, err); keep {
if reason != "" {
err = fmt.Errorf("%w (%s)", err, reason)
}
Expand All @@ -128,6 +134,21 @@ func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req
return nil
}

// shouldLogClientError returns whether err should be logged.
func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) {
var optional middleware.OptionalLogging
if !errors.As(err, &optional) {
// If error isn't sampled yet, we wrap it into our sampler and try again.
err = c.fallbackClientErrSampler.WrapError(err)
if !errors.As(err, &optional) {
// We can get here if c.clientErrSampler is nil.
return true, ""
}
}

return optional.ShouldLog(ctx)
}

// The passed context is expected to be cancelled after all items in records were fully processed and are ready
// to be released. This so to guaranty the release of resources associated with each parsedRecord context.
func (c pusherConsumer) unmarshalRequests(ctx context.Context, records []record, recC chan<- parsedRecord) {
Expand Down
55 changes: 52 additions & 3 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestPusherConsumer(t *testing.T) {
})

logs := &concurrency.SyncBuffer{}
c := newPusherConsumer(pusher, prometheus.NewPedanticRegistry(), log.NewLogfmtLogger(logs))
c := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewLogfmtLogger(logs))
err := c.consume(context.Background(), tc.records)
if tc.expErr == "" {
assert.NoError(t, err)
Expand All @@ -217,6 +217,55 @@ func TestPusherConsumer(t *testing.T) {
}
}

func TestPusherConsumer_clientErrorSampling(t *testing.T) {
type testCase struct {
sampler *util_log.Sampler
err error
expectedSampled bool
expectedReason string
}

plainError := fmt.Errorf("plain")

for name, tc := range map[string]testCase{
"nil sampler, plain error": {
sampler: nil,
err: plainError,
expectedSampled: true,
expectedReason: "",
},

"nil sampler, sampled error": {
sampler: nil,
err: util_log.NewSampler(20).WrapError(plainError), // need to use new sampler to make sure it samples the error
expectedSampled: true,
expectedReason: "sampled 1/20",
},

"fallback sampler, plain error": {
sampler: util_log.NewSampler(5),
err: plainError,
expectedSampled: true,
expectedReason: "sampled 1/5",
},

"fallback sampler, sampled error": {
sampler: util_log.NewSampler(5),
err: util_log.NewSampler(20).WrapError(plainError), // need to use new sampler to make sure it samples the error
expectedSampled: true,
expectedReason: "sampled 1/20",
},
} {
t.Run(name, func(t *testing.T) {
c := newPusherConsumer(nil, tc.sampler, prometheus.NewPedanticRegistry(), log.NewNopLogger())

sampled, reason := c.shouldLogClientError(context.Background(), tc.err)
assert.Equal(t, tc.expectedSampled, sampled)
assert.Equal(t, tc.expectedReason, reason)
})
}
}

func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testing.T) {
// Create a request that will be used in this test. The content doesn't matter,
// since we only test errors.
Expand All @@ -237,7 +286,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

reg := prometheus.NewPedanticRegistry()
logs := &concurrency.SyncBuffer{}
consumer := newPusherConsumer(pusher, reg, log.NewLogfmtLogger(logs))
consumer := newPusherConsumer(pusher, nil, reg, log.NewLogfmtLogger(logs))

return consumer, logs, reg
}
Expand Down Expand Up @@ -323,7 +372,7 @@ func TestPusherConsumer_consume_ShouldHonorContextCancellation(t *testing.T) {
<-ctx.Done()
return context.Cause(ctx)
})
consumer := newPusherConsumer(pusher, prometheus.NewPedanticRegistry(), log.NewNopLogger())
consumer := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewNopLogger())

wantCancelErr := cancellation.NewErrorf("stop")

Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/twmb/franz-go/plugin/kprom"
"go.uber.org/atomic"

util_log "github.com/grafana/mimir/pkg/util/log"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ type PartitionReader struct {
}

func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) {
consumer := newPusherConsumer(pusher, reg, logger)
consumer := newPusherConsumer(pusher, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), reg, logger)
return newPartitionReader(kafkaCfg, partitionID, instanceID, consumer, logger, reg)
}

Expand Down
12 changes: 0 additions & 12 deletions pkg/storage/ingest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/middleware"
"github.com/grafana/regexp"
"github.com/pkg/errors"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
Expand Down Expand Up @@ -131,16 +129,6 @@ func (w *resultPromise[T]) wait(ctx context.Context) (T, error) {
}
}

// shouldLog returns whether err should be logged.
func shouldLog(ctx context.Context, err error) (bool, string) {
var optional middleware.OptionalLogging
if !errors.As(err, &optional) {
return true, ""
}

return optional.ShouldLog(ctx)
}

// setDefaultNumberOfPartitionsForAutocreatedTopics tries to set num.partitions config option on brokers.
// This is best-effort, if setting the option fails, error is logged, but not returned.
func setDefaultNumberOfPartitionsForAutocreatedTopics(cfg KafkaConfig, logger log.Logger) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/log/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ func TestSampledError_ShouldImplementOptionalLoggingInterface(t *testing.T) {
var optionalLoggingErr middleware.OptionalLogging
require.ErrorAs(t, sampledErr, &optionalLoggingErr)
}

func TestNilSampler(t *testing.T) {
var s *Sampler
err := fmt.Errorf("error")

sampledErr := s.WrapError(err)
require.NotNil(t, sampledErr)
require.Equal(t, err, sampledErr)
}

0 comments on commit eab459c

Please sign in to comment.