From 456bbfde5745fcb711cee460d208c4e073aa5974 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Mon, 30 Sep 2024 18:29:53 +0200 Subject: [PATCH] kafka replay speed: upstream push sharding (#9454) * kafka replay speed: upstream push sharding Signed-off-by: Dimitar Dimitrov Co-authored-by: gotjosh Signed-off-by: Dimitar Dimitrov * Apply suggestions from code review Co-authored-by: gotjosh --------- Signed-off-by: Dimitar Dimitrov Co-authored-by: gotjosh --- cmd/mimir/config-descriptor.json | 20 + cmd/mimir/help-all.txt.tmpl | 4 + cmd/mimir/help.txt.tmpl | 4 + .../configuration-parameters/index.md | 11 + .../mimir-mixin/dashboards/writes.libsonnet | 8 +- pkg/storage/ingest/config.go | 6 + pkg/storage/ingest/pusher.go | 628 ++++++++++--- pkg/storage/ingest/pusher_metrics.go | 97 ++ pkg/storage/ingest/pusher_test.go | 836 ++++++++++++++++-- pkg/storage/ingest/reader.go | 34 +- pkg/storage/ingest/reader_test.go | 12 +- pkg/storage/ingest/writer_test.go | 21 + 12 files changed, 1504 insertions(+), 177 deletions(-) create mode 100644 pkg/storage/ingest/pusher_metrics.go diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 61ecea7977b..a419297bab6 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6678,6 +6678,26 @@ "fieldDefaultValue": true, "fieldFlag": "ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes", "fieldType": "boolean" + }, + { + "kind": "field", + "name": "ingestion_concurrency", + "required": false, + "desc": "The number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "ingest-storage.kafka.ingestion-concurrency", + "fieldType": "int" + }, + { + "kind": "field", + "name": "ingestion_concurrency_batch_size", + "required": false, + "desc": "The number of timeseries to batch together before ingesting into TSDB. This is only used when -ingest-storage.kafka.ingestion-concurrency is greater than 0.", + "fieldValue": null, + "fieldDefaultValue": 150, + "fieldFlag": "ingest-storage.kafka.ingestion-concurrency-batch-size", + "fieldType": "int" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index eeb11e588b7..2924c62e848 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1363,6 +1363,10 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) + -ingest-storage.kafka.ingestion-concurrency int + The number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable. + -ingest-storage.kafka.ingestion-concurrency-batch-size int + The number of timeseries to batch together before ingesting into TSDB. This is only used when -ingest-storage.kafka.ingestion-concurrency is greater than 0. (default 150) -ingest-storage.kafka.last-produced-offset-poll-interval duration How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s) -ingest-storage.kafka.last-produced-offset-retry-timeout duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index d337867bfbd..f556d397e28 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -411,6 +411,10 @@ Usage of ./cmd/mimir/mimir: How frequently a consumer should commit the consumed offset to Kafka. The last committed offset is used at startup to continue the consumption from where it was left. (default 1s) -ingest-storage.kafka.dial-timeout duration The maximum time allowed to open a connection to a Kafka broker. (default 2s) + -ingest-storage.kafka.ingestion-concurrency int + The number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable. + -ingest-storage.kafka.ingestion-concurrency-batch-size int + The number of timeseries to batch together before ingesting into TSDB. This is only used when -ingest-storage.kafka.ingestion-concurrency is greater than 0. (default 150) -ingest-storage.kafka.last-produced-offset-poll-interval duration How frequently to poll the last produced offset, used to enforce strong read consistency. (default 1s) -ingest-storage.kafka.last-produced-offset-retry-timeout duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index f7bcc4814fe..34c6d534883 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3873,6 +3873,17 @@ kafka: # CLI flag: -ingest-storage.kafka.use-compressed-bytes-as-fetch-max-bytes [use_compressed_bytes_as_fetch_max_bytes: | default = true] + # The number of concurrent ingestion streams to the TSDB head. Every tenant + # has their own set of streams. 0 to disable. + # CLI flag: -ingest-storage.kafka.ingestion-concurrency + [ingestion_concurrency: | default = 0] + + # The number of timeseries to batch together before ingesting into TSDB. This + # is only used when -ingest-storage.kafka.ingestion-concurrency is greater + # than 0. + # CLI flag: -ingest-storage.kafka.ingestion-concurrency-batch-size + [ingestion_concurrency_batch_size: | default = 150] + migration: # When both this option and ingest storage are enabled, distributors write to # both Kafka and ingesters. A write request is considered successful only when diff --git a/operations/mimir-mixin/dashboards/writes.libsonnet b/operations/mimir-mixin/dashboards/writes.libsonnet index d345ef368fe..dc03b5931a1 100644 --- a/operations/mimir-mixin/dashboards/writes.libsonnet +++ b/operations/mimir-mixin/dashboards/writes.libsonnet @@ -274,10 +274,10 @@ local filename = 'mimir-writes.json'; ) + $.queryPanel( [ - 'histogram_avg(sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], - 'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_avg(sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(0.99, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(0.999, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], + 'histogram_quantile(1.0, sum(rate(cortex_ingest_storage_reader_records_processing_time_seconds{%s}[$__rate_interval])))' % [$.jobMatcher($._config.job_names.ingester)], ], [ 'avg', diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index 47aeb685a54..f4d9aeab1e7 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -98,6 +98,9 @@ type KafkaConfig struct { OngoingFetchConcurrency int `yaml:"ongoing_fetch_concurrency"` OngoingRecordsPerFetch int `yaml:"ongoing_records_per_fetch"` UseCompressedBytesAsFetchMaxBytes bool `yaml:"use_compressed_bytes_as_fetch_max_bytes"` + + IngestionConcurrency int `yaml:"ingestion_concurrency"` + IngestionConcurrencyBatchSize int `yaml:"ingestion_concurrency_batch_size"` } func (cfg *KafkaConfig) RegisterFlags(f *flag.FlagSet) { @@ -138,6 +141,9 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.IntVar(&cfg.OngoingFetchConcurrency, prefix+".ongoing-fetch-concurrency", 0, "The number of concurrent fetch requests that the ingester makes when reading data continuously from Kafka after startup. Is disabled unless "+prefix+".startup-fetch-concurrency is greater than 0. It must be greater than 0.") f.IntVar(&cfg.OngoingRecordsPerFetch, prefix+".ongoing-records-per-fetch", 30, "The number of records per fetch request that the ingester makes when reading data continuously from Kafka after startup. Depends on "+prefix+".ongoing-fetch-concurrency being greater than 0.") f.BoolVar(&cfg.UseCompressedBytesAsFetchMaxBytes, prefix+".use-compressed-bytes-as-fetch-max-bytes", true, "When enabled, the fetch request MaxBytes field is computed using the compressed size of previous records. When disabled, MaxBytes is computed using uncompressed bytes. Different Kafka implementations interpret MaxBytes differently.") + + f.IntVar(&cfg.IngestionConcurrency, prefix+".ingestion-concurrency", 0, "The number of concurrent ingestion streams to the TSDB head. Every tenant has their own set of streams. 0 to disable.") + f.IntVar(&cfg.IngestionConcurrencyBatchSize, prefix+".ingestion-concurrency-batch-size", 150, "The number of timeseries to batch together before ingesting into TSDB. This is only used when -ingest-storage.kafka.ingestion-concurrency is greater than 0.") } func (cfg *KafkaConfig) Validate() error { diff --git a/pkg/storage/ingest/pusher.go b/pkg/storage/ingest/pusher.go index 46751241427..e249c605ac8 100644 --- a/pkg/storage/ingest/pusher.go +++ b/pkg/storage/ingest/pusher.go @@ -6,141 +6,466 @@ import ( "context" "errors" "fmt" + "math/rand/v2" + "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/cancellation" "github.com/grafana/dskit/middleware" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/mimir/pkg/mimirpb" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" ) +// batchingQueueCapacity controls how many batches can be enqueued for flushing. +// We don't want to push any batches in parallel and instead want to prepare the next ones while the current one finishes, hence the buffer of 5. +// For example, if we flush 1 batch/sec, then batching 2 batches/sec doesn't make us faster. +// This is our initial assumption, and there's potential in testing with higher numbers if there's a high variability in flush times - assuming we can preserve the order of the batches. For now, we'll stick to 5. +// If there's high variability in the time to flush or in the time to batch, then this buffer might need to be increased. +const batchingQueueCapacity = 5 + type Pusher interface { PushToStorage(context.Context, *mimirpb.WriteRequest) error } +type PusherCloser interface { + // PushToStorage pushes the write request to the storage. + PushToStorage(context.Context, *mimirpb.WriteRequest) error + // Close tells the PusherCloser that no more records are coming and it should flush any remaining records. + Close() []error +} + +// pusherConsumer receives records from Kafka and pushes them to the storage. +// Each time a batch of records is received from Kafka, we instantiate a new pusherConsumer, this is to ensure we can retry if necessary and know whether we have completed that batch or not. type pusherConsumer struct { - pusher Pusher + metrics *pusherConsumerMetrics + logger log.Logger - processingTimeSeconds prometheus.Observer - clientErrRequests prometheus.Counter - serverErrRequests prometheus.Counter - totalRequests prometheus.Counter + kafkaConfig KafkaConfig - fallbackClientErrSampler *util_log.Sampler // Fallback log message sampler client errors that are not sampled yet. - logger log.Logger + pusher Pusher } -type parsedRecord struct { - *mimirpb.WriteRequest - // Context holds the tracing and cancellation data for this record/request. - ctx context.Context - tenantID string - err error +// newPusherConsumer creates a new pusherConsumer instance. +func newPusherConsumer(pusher Pusher, kafkaCfg KafkaConfig, metrics *pusherConsumerMetrics, logger log.Logger) *pusherConsumer { + // The layer below (parallelStoragePusher, parallelStorageShards, sequentialStoragePusher) will return all errors they see + // and potentially ingesting a batch if they encounter any error. + // We can safely ignore client errors and continue ingesting. We abort ingesting if we get any other error. + return &pusherConsumer{ + pusher: pusher, + kafkaConfig: kafkaCfg, + metrics: metrics, + logger: logger, + } } -func newPusherConsumer(p Pusher, fallbackClientErrSampler *util_log.Sampler, reg prometheus.Registerer, l log.Logger) *pusherConsumer { - errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_failed_total", - Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", - }, []string{"cause"}) +// Consume implements the recordConsumer interface. +// It'll use a separate goroutine to unmarshal the next record while we push the current record to storage. +func (c pusherConsumer) Consume(ctx context.Context, records []record) error { + defer func(processingStart time.Time) { + c.metrics.processingTimeSeconds.Observe(time.Since(processingStart).Seconds()) + }(time.Now()) - return &pusherConsumer{ - pusher: p, - logger: l, - fallbackClientErrSampler: fallbackClientErrSampler, - processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_ingest_storage_reader_processing_time_seconds", - Help: "Time taken to process a single record (write request).", - NativeHistogramBucketFactor: 1.1, - NativeHistogramMaxBucketNumber: 100, - NativeHistogramMinResetDuration: 1 * time.Hour, - Buckets: prometheus.DefBuckets, - }), - clientErrRequests: errRequestsCounter.WithLabelValues("client"), - serverErrRequests: errRequestsCounter.WithLabelValues("server"), - totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_reader_records_total", - Help: "Number of attempted records (write requests).", - }), - } -} - -func (c pusherConsumer) consume(ctx context.Context, records []record) error { + type parsedRecord struct { + *mimirpb.WriteRequest + // ctx holds the tracing baggage for this record/request. + ctx context.Context + tenantID string + err error + index int + } + + recordsChannel := make(chan parsedRecord) + + // Create a cancellable context to let the unmarshalling goroutine know when to stop. ctx, cancel := context.WithCancelCause(ctx) - defer cancel(cancellation.NewErrorf("done consuming records")) - recC := make(chan parsedRecord) + // Now, unmarshal the records into the channel. + go func(unmarshalCtx context.Context, records []record, ch chan<- parsedRecord) { + defer close(ch) - // Speed up consumption by unmarhsalling the next request while the previous one is being pushed. - go c.unmarshalRequests(ctx, records, recC) - return c.pushRequests(recC) -} + for index, r := range records { + // Before we being unmarshalling the write request check if the context was cancelled. + select { + case <-unmarshalCtx.Done(): + // No more processing is needed, so we need to abort. + return + default: + } + + parsed := parsedRecord{ + ctx: r.ctx, + tenantID: r.tenantID, + WriteRequest: &mimirpb.WriteRequest{}, + index: index, + } + + // We don't free the WriteRequest slices because they are being freed by a level below. + err := parsed.WriteRequest.Unmarshal(r.content) + if err != nil { + parsed.err = fmt.Errorf("parsing ingest consumer write request: %w", err) + } -func (c pusherConsumer) pushRequests(reqC <-chan parsedRecord) error { - recordIdx := -1 - for wr := range reqC { - recordIdx++ - if wr.err != nil { - level.Error(c.logger).Log("msg", "failed to parse write request; skipping", "err", wr.err) + // Now that we're done, check again before we send it to the channel. + select { + case <-unmarshalCtx.Done(): + return + case ch <- parsed: + } + } + }(ctx, records, recordsChannel) + + writer := c.newStorageWriter() + for r := range recordsChannel { + if r.err != nil { + level.Error(spanlogger.FromContext(ctx, c.logger)).Log("msg", "failed to parse write request; skipping", "err", r.err) continue } - err := c.pushToStorage(wr.ctx, wr.tenantID, wr.WriteRequest) + // If we get an error at any point, we need to stop processing the records. They will be retried at some point. + err := c.pushToStorage(r.ctx, r.tenantID, r.WriteRequest, writer) if err != nil { - return fmt.Errorf("consuming record at index %d for tenant %s: %w", recordIdx, wr.tenantID, err) + cancel(cancellation.NewErrorf("error while pushing to storage")) // Stop the unmarshalling goroutine. + return fmt.Errorf("consuming record at index %d for tenant %s: %w", r.index, r.tenantID, err) } } - return nil + + cancel(cancellation.NewErrorf("done unmarshalling records")) + + // We need to tell the storage writer that we're done and no more records are coming. + return multierror.New(writer.Close()...).Err() +} + +func (c pusherConsumer) newStorageWriter() PusherCloser { + if c.kafkaConfig.IngestionConcurrency == 0 { + return newSequentialStoragePusher(c.metrics.storagePusherMetrics, c.pusher, c.kafkaConfig.FallbackClientErrorSampleRate, c.logger) + } + + return newParallelStoragePusher( + c.metrics.storagePusherMetrics, + c.pusher, + c.kafkaConfig.FallbackClientErrorSampleRate, + c.kafkaConfig.IngestionConcurrency, + c.kafkaConfig.IngestionConcurrencyBatchSize, + c.logger, + ) } -func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest) error { +func (c pusherConsumer) pushToStorage(ctx context.Context, tenantID string, req *mimirpb.WriteRequest, writer PusherCloser) error { spanLog, ctx := spanlogger.NewWithLogger(ctx, c.logger, "pusherConsumer.pushToStorage") defer spanLog.Finish() - processingStart := time.Now() - // Note that the implementation of the Pusher expects the tenantID to be in the context. ctx = user.InjectOrgID(ctx, tenantID) - err := c.pusher.PushToStorage(ctx, req) - c.processingTimeSeconds.Observe(time.Since(processingStart).Seconds()) - c.totalRequests.Inc() + err := writer.PushToStorage(ctx, req) + + return err +} + +// sequentialStoragePusher receives mimirpb.WriteRequest which are then pushed to the storage one by one. +type sequentialStoragePusher struct { + metrics *storagePusherMetrics + errorHandler *pushErrorHandler + + pusher Pusher +} +// newSequentialStoragePusher creates a new sequentialStoragePusher instance. +func newSequentialStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, logger log.Logger) sequentialStoragePusher { + return sequentialStoragePusher{ + metrics: metrics, + pusher: pusher, + errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger), + } +} + +// PushToStorage implements the PusherCloser interface. +func (ssp sequentialStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { + ssp.metrics.timeSeriesPerFlush.Observe(float64(len(wr.Timeseries))) + defer func(now time.Time) { + ssp.metrics.processingTime.WithLabelValues(requestContents(wr)).Observe(time.Since(now).Seconds()) + }(time.Now()) + + if err := ssp.pusher.PushToStorage(ctx, wr); ssp.errorHandler.IsServerError(ctx, err) { + return err + } + + return nil +} + +// Close implements the PusherCloser interface. +func (ssp sequentialStoragePusher) Close() []error { + return nil +} + +// parallelStoragePusher receives WriteRequest which are then pushed to the storage in parallel. +// The parallelism is two-tiered which means that we first parallelize by tenantID and then by series. +type parallelStoragePusher struct { + metrics *storagePusherMetrics + logger log.Logger + + // pushers is map["$tenant|$source"]*parallelStorageShards + pushers map[string]*parallelStorageShards + upstreamPusher Pusher + errorHandler *pushErrorHandler + numShards int + batchSize int +} + +// newParallelStoragePusher creates a new parallelStoragePusher instance. +func newParallelStoragePusher(metrics *storagePusherMetrics, pusher Pusher, sampleRate int64, numShards int, batchSize int, logger log.Logger) *parallelStoragePusher { + return ¶llelStoragePusher{ + logger: log.With(logger, "component", "parallel-storage-pusher"), + pushers: make(map[string]*parallelStorageShards), + upstreamPusher: pusher, + numShards: numShards, + errorHandler: newPushErrorHandler(metrics, util_log.NewSampler(sampleRate), logger), + batchSize: batchSize, + metrics: metrics, + } +} + +// PushToStorage implements the PusherCloser interface. +func (c parallelStoragePusher) PushToStorage(ctx context.Context, wr *mimirpb.WriteRequest) error { + userID, err := user.ExtractOrgID(ctx) if err != nil { - // Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly). - if !mimirpb.IsClientError(err) { - c.serverErrRequests.Inc() - return spanLog.Error(err) - } + level.Error(c.logger).Log("msg", "failed to extract tenant ID from context", "err", err) + } - c.clientErrRequests.Inc() + shards := c.shardsFor(userID, wr.Source) + return shards.ShardWriteRequest(ctx, wr) +} - // The error could be sampled or marked to be skipped in logs, so we check whether it should be - // logged before doing it. - if keep, reason := c.shouldLogClientError(ctx, err); keep { - if reason != "" { - err = fmt.Errorf("%w (%s)", err, reason) - } - // This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors. - level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "user", tenantID, "insight", true, "err", err) +// Close implements the PusherCloser interface. +func (c parallelStoragePusher) Close() []error { + var errs multierror.MultiError + for _, p := range c.pushers { + errs.Add(p.Stop()) + } + clear(c.pushers) + return errs +} + +// shardsFor returns the parallelStorageShards for the given userID. Once created the same shards are re-used for the same userID. +// We create a shard for each tenantID to parallelize the writes. +func (c parallelStoragePusher) shardsFor(userID string, requestSource mimirpb.WriteRequest_SourceEnum) *parallelStorageShards { + // Construct the string inline so that it doesn't escape to the heap. Go doesn't escape strings that are used to only look up map keys. + // We can use "|" because that cannot be part of a tenantID in Mimir. + if p := c.pushers[userID+"|"+requestSource.String()]; p != nil { + return p + } + // Use the same hashing function that's used for stripes in the TSDB. That way we make use of the low-contention property of stripes. + hashLabels := labels.Labels.Hash + p := newParallelStorageShards(c.metrics, c.errorHandler, c.numShards, c.batchSize, batchingQueueCapacity, c.upstreamPusher, hashLabels) + c.pushers[userID+"|"+requestSource.String()] = p + return p +} + +type labelsHashFunc func(labels.Labels) uint64 + +// parallelStorageShards is a collection of shards that are used to parallelize the writes to the storage by series. +// Each series is hashed to a shard that contains its own batchingQueue. +type parallelStorageShards struct { + metrics *storagePusherMetrics + errorHandler *pushErrorHandler + + pusher Pusher + hashLabels labelsHashFunc + + numShards int + batchSize int + capacity int + + wg *sync.WaitGroup + shards []*batchingQueue +} + +type flushableWriteRequest struct { + // startedAt is the time when the first item was added to this request (timeseries or metadata). + startedAt time.Time + *mimirpb.WriteRequest + context.Context +} + +// newParallelStorageShards creates a new parallelStorageShards instance. +func newParallelStorageShards(metrics *storagePusherMetrics, errorHandler *pushErrorHandler, numShards int, batchSize int, capacity int, pusher Pusher, hashLabels labelsHashFunc) *parallelStorageShards { + p := ¶llelStorageShards{ + numShards: numShards, + pusher: pusher, + errorHandler: errorHandler, + hashLabels: hashLabels, + capacity: capacity, + metrics: metrics, + batchSize: batchSize, + wg: &sync.WaitGroup{}, + } + + p.start() + + return p +} + +// ShardWriteRequest hashes each time series in the write requests and sends them to the appropriate shard which is then handled by the current batchingQueue in that shard. +// ShardWriteRequest ignores SkipLabelNameValidation because that field is only used in the distributor and not in the ingester. +// ShardWriteRequest aborts the request if it encounters an error. +func (p *parallelStorageShards) ShardWriteRequest(ctx context.Context, request *mimirpb.WriteRequest) error { + var ( + builder labels.ScratchBuilder + nonCopiedLabels labels.Labels + ) + + for _, ts := range request.Timeseries { + mimirpb.FromLabelAdaptersOverwriteLabels(&builder, ts.Labels, &nonCopiedLabels) + shard := p.hashLabels(nonCopiedLabels) % uint64(p.numShards) + + if err := p.shards[shard].AddToBatch(ctx, request.Source, ts); err != nil { + return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) + } + } + + // Push metadata to every shard in a round-robin fashion. + // Start from a random shard to avoid hotspots in the first few shards when there are not many metadata pieces in each request. + shard := rand.IntN(p.numShards) + for mdIdx := range request.Metadata { + if err := p.shards[shard].AddMetadataToBatch(ctx, request.Source, request.Metadata[mdIdx]); err != nil { + return fmt.Errorf("encountered a non-client error when ingesting; this error was for a previous write request for the same tenant: %w", err) } + shard++ + shard %= p.numShards } + + // We might have some data left in some of the queues in the shards, but they will be flushed eventually once Stop is called, and we're certain that no more data is coming. + // So far we didn't find any non-client errors that are worth aborting for. + // We'll call Close eventually and collect the rest. return nil } +// Stop stops all the shards and waits for them to finish. +func (p *parallelStorageShards) Stop() error { + var errs multierror.MultiError + + for _, shard := range p.shards { + errs.Add(shard.Close()) + } + + p.wg.Wait() + + return errs.Err() +} + +// start starts the shards, each in its own goroutine. +func (p *parallelStorageShards) start() { + shards := make([]*batchingQueue, p.numShards) + p.wg.Add(p.numShards) + + for i := range shards { + shards[i] = newBatchingQueue(p.capacity, p.batchSize, p.metrics.batchingQueueMetrics) + go p.run(shards[i]) + } + + p.shards = shards +} + +// run runs the batchingQueue for the shard. +func (p *parallelStorageShards) run(queue *batchingQueue) { + defer p.wg.Done() + defer queue.Done() + + for wr := range queue.Channel() { + p.metrics.batchAge.Observe(time.Since(wr.startedAt).Seconds()) + p.metrics.timeSeriesPerFlush.Observe(float64(len(wr.WriteRequest.Timeseries))) + processingStart := time.Now() + + err := p.pusher.PushToStorage(wr.Context, wr.WriteRequest) + + // The error handler needs to determine if this is a server error or not. + // If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing. + p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds()) + if err != nil && p.errorHandler.IsServerError(wr.Context, err) { + queue.ErrorChannel() <- err + } + } +} + +func requestContents(request *mimirpb.WriteRequest) string { + switch { + case len(request.Timeseries) > 0 && len(request.Metadata) > 0: + return "timeseries_and_metadata" + case len(request.Timeseries) > 0: + return "timeseries" + case len(request.Metadata) > 0: + return "metadata" + default: + // This would be a bug, but at least we'd know. + return "empty" + } +} + +// pushErrorHandler filters out client errors and logs them. +// It only returns errors that are not client errors. +type pushErrorHandler struct { + metrics *storagePusherMetrics + clientErrSampler *util_log.Sampler + fallbackLogger log.Logger +} + +// newPushErrorHandler creates a new pushErrorHandler instance. +func newPushErrorHandler(metrics *storagePusherMetrics, clientErrSampler *util_log.Sampler, fallbackLogger log.Logger) *pushErrorHandler { + return &pushErrorHandler{ + metrics: metrics, + clientErrSampler: clientErrSampler, + fallbackLogger: fallbackLogger, + } +} + +// IsServerError returns whether the error is a server error or not, the context is used to extract the span from the trace. +// When the error is a server error, we'll add it to the span passed down in the context and return true to indicate that the we should stop processing. +// When it is a client error, we'll add it to the span and log it to stdout/stderr. +func (p *pushErrorHandler) IsServerError(ctx context.Context, err error) bool { + // For every request, we have to determine if it's a server error. + // For the sake of simplicity, let's increment the total requests counter here. + p.metrics.totalRequests.Inc() + + spanLog := spanlogger.FromContext(ctx, p.fallbackLogger) + if err == nil { + return false + } + + // Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly). + if !mimirpb.IsClientError(err) { + p.metrics.serverErrRequests.Inc() + _ = spanLog.Error(err) + return true + } + + p.metrics.clientErrRequests.Inc() + + // The error could be sampled or marked to be skipped in logs, so we check whether it should be + // logged before doing it. + if keep, reason := p.shouldLogClientError(ctx, err); keep { + if reason != "" { + err = fmt.Errorf("%w (%s)", err, reason) + } + + // This error message is consistent with error message in Prometheus remote-write and OTLP handlers in distributors. + level.Warn(spanLog).Log("msg", "detected a client error while ingesting write request (the request may have been partially ingested)", "insight", true, "err", err) + } + return false +} + // shouldLogClientError returns whether err should be logged. -func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bool, string) { +func (p *pushErrorHandler) shouldLogClientError(ctx context.Context, err error) (bool, string) { var optional middleware.OptionalLogging if !errors.As(err, &optional) { // If error isn't sampled yet, we wrap it into our sampler and try again. - err = c.fallbackClientErrSampler.WrapError(err) + err = p.clientErrSampler.WrapError(err) if !errors.As(err, &optional) { // We can get here if c.clientErrSampler is nil. return true, "" @@ -150,34 +475,127 @@ func (c pusherConsumer) shouldLogClientError(ctx context.Context, err error) (bo return optional.ShouldLog(ctx) } -// The passed context is expected to be cancelled after all items in records were fully processed and are ready -// to be released. This so to guaranty the release of resources associated with each parsedRecord context. -func (c pusherConsumer) unmarshalRequests(ctx context.Context, records []record, recC chan<- parsedRecord) { - defer close(recC) - done := ctx.Done() - - for _, rec := range records { - // rec.ctx contains the tracing baggage for this record, which we propagate down the call tree. - // Since rec.ctx cancellation is disjointed from the context passed to unmarshalRequests(), the context.AfterFunc below, - // fuses the two lifetimes together. - recCtx, cancelRecCtx := context.WithCancelCause(rec.ctx) - context.AfterFunc(ctx, func() { - cancelRecCtx(context.Cause(ctx)) - }) - pRecord := parsedRecord{ - ctx: recCtx, - tenantID: rec.tenantID, - WriteRequest: &mimirpb.WriteRequest{}, - } - // We don't free the WriteRequest slices because they are being freed by the Pusher. - err := pRecord.WriteRequest.Unmarshal(rec.content) - if err != nil { - pRecord.err = fmt.Errorf("parsing ingest consumer write request: %w", err) +// batchingQueue is a queue that batches the incoming time series according to the batch size. +// Once the batch size is reached, the batch is pushed to a channel which can be accessed through the Channel() method. +type batchingQueue struct { + metrics *batchingQueueMetrics + + ch chan flushableWriteRequest + errCh chan error + done chan struct{} + + currentBatch flushableWriteRequest + batchSize int +} + +// newBatchingQueue creates a new batchingQueue instance. +func newBatchingQueue(capacity int, batchSize int, metrics *batchingQueueMetrics) *batchingQueue { + return &batchingQueue{ + metrics: metrics, + ch: make(chan flushableWriteRequest, capacity), + errCh: make(chan error, capacity+1), // We check errs before pushing to the channel, so we need to have a buffer of at least capacity+1 so that the consumer can push all of its errors and not rely on the producer to unblock it. + done: make(chan struct{}), + currentBatch: flushableWriteRequest{WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}}, + batchSize: batchSize, + } +} + +// AddToBatch adds a time series to the current batch. If the batch size is reached, the batch is pushed to the Channel(). +// If an error occurs while pushing the batch, it returns the error and ensures the batch is pushed. +func (q *batchingQueue) AddToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, ts mimirpb.PreallocTimeseries) error { + if q.currentBatch.startedAt.IsZero() { + q.currentBatch.startedAt = time.Now() + } + q.currentBatch.Timeseries = append(q.currentBatch.Timeseries, ts) + q.currentBatch.Context = ctx + q.currentBatch.Source = source + + return q.pushIfFull() +} + +// AddMetadataToBatch adds metadata to the current batch. +func (q *batchingQueue) AddMetadataToBatch(ctx context.Context, source mimirpb.WriteRequest_SourceEnum, metadata *mimirpb.MetricMetadata) error { + if q.currentBatch.startedAt.IsZero() { + q.currentBatch.startedAt = time.Now() + } + q.currentBatch.Metadata = append(q.currentBatch.Metadata, metadata) + q.currentBatch.Context = ctx + q.currentBatch.Source = source + + return q.pushIfFull() +} + +// Close closes the batchingQueue, it'll push the current branch to the channel if it's not empty. +// and then close the channel. +func (q *batchingQueue) Close() error { + var errs multierror.MultiError + if len(q.currentBatch.Timeseries)+len(q.currentBatch.Metadata) > 0 { + if err := q.push(); err != nil { + errs.Add(err) } + } + + close(q.ch) + <-q.done + + errs = append(errs, q.collectErrors()...) + close(q.errCh) + return errs.Err() +} + +// Channel returns the channel where the batches are pushed. +func (q *batchingQueue) Channel() <-chan flushableWriteRequest { + return q.ch +} + +// ErrorChannel returns the channel where errors are pushed. +func (q *batchingQueue) ErrorChannel() chan<- error { + return q.errCh +} + +// Done signals the queue that there is no more data coming for both the channel and the error channel. +// It is necessary to ensure we don't close the channel before all the data is flushed. +func (q *batchingQueue) Done() { + close(q.done) +} + +func (q *batchingQueue) pushIfFull() error { + if len(q.currentBatch.Metadata)+len(q.currentBatch.Timeseries) >= q.batchSize { + return q.push() + } + return nil +} + +// push pushes the current batch to the channel and resets the current batch. +// It also collects any errors that might have occurred while pushing the batch. +func (q *batchingQueue) push() error { + errs := q.collectErrors() + + q.metrics.flushErrorsTotal.Add(float64(len(errs))) + q.metrics.flushTotal.Inc() + + q.ch <- q.currentBatch + q.resetCurrentBatch() + + return errs.Err() +} + +// resetCurrentBatch resets the current batch to an empty state. +func (q *batchingQueue) resetCurrentBatch() { + q.currentBatch = flushableWriteRequest{ + WriteRequest: &mimirpb.WriteRequest{Timeseries: mimirpb.PreallocTimeseriesSliceFromPool()}, + } +} + +func (q *batchingQueue) collectErrors() multierror.MultiError { + var errs multierror.MultiError + + for { select { - case <-done: - return - case recC <- pRecord: + case err := <-q.errCh: + errs.Add(err) + default: + return errs } } } diff --git a/pkg/storage/ingest/pusher_metrics.go b/pkg/storage/ingest/pusher_metrics.go new file mode 100644 index 00000000000..8b5b3d1ff07 --- /dev/null +++ b/pkg/storage/ingest/pusher_metrics.go @@ -0,0 +1,97 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package ingest + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// pusherConsumerMetrics holds the metrics for the pusherConsumer. +type pusherConsumerMetrics struct { + processingTimeSeconds prometheus.Observer + + storagePusherMetrics *storagePusherMetrics +} + +// newPusherConsumerMetrics creates a new pusherConsumerMetrics instance. +func newPusherConsumerMetrics(reg prometheus.Registerer) *pusherConsumerMetrics { + return &pusherConsumerMetrics{ + storagePusherMetrics: newStoragePusherMetrics(reg), + processingTimeSeconds: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_records_processing_time_seconds", + Help: "Time taken to process a batch of fetched records. Fetched records are effectively a set of WriteRequests read from Kafka.", + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 100, + NativeHistogramMinResetDuration: 1 * time.Hour, + Buckets: prometheus.DefBuckets, + }), + } +} + +// storagePusherMetrics holds the metrics for both the sequentialStoragePusher and the parallelStoragePusher. +type storagePusherMetrics struct { + // batchAge is not really important unless we're pushing many things at once, so it's only used as part of parallelStoragePusher. + batchAge prometheus.Histogram + processingTime *prometheus.HistogramVec + timeSeriesPerFlush prometheus.Histogram + batchingQueueMetrics *batchingQueueMetrics + clientErrRequests prometheus.Counter + serverErrRequests prometheus.Counter + totalRequests prometheus.Counter +} + +// newStoragePusherMetrics creates a new storagePusherMetrics instance. +func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics { + errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_failed_total", + Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.", + }, []string{"cause"}) + + return &storagePusherMetrics{ + batchingQueueMetrics: newBatchingQueueMetrics(reg), + batchAge: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_batch_age_seconds", + Help: "Age of the batch of samples that are being ingested by an ingestion shard. This is the time since adding the first sample to the batch. Higher values indicates that the batching queue is not processing fast enough or that the batches are not filling up fast enough.", + NativeHistogramBucketFactor: 1.1, + }), + processingTime: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_processing_time_seconds", + Help: "Time to ingest a batch of samples for timeseries or metadata by an ingestion shard. The 'batch_contents' label indicates the contents of the batch.", + NativeHistogramBucketFactor: 1.1, + }, []string{"content"}), + timeSeriesPerFlush: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingest_storage_reader_pusher_timeseries_per_flush", + Help: "Number of time series pushed in each batch to an ingestion shard. A lower number than -ingest-storage.kafka.ingestion-concurrency-batch-size indicates that shards are not filling up and may not be parallelizing ingestion as efficiently.", + NativeHistogramBucketFactor: 1.1, + }), + clientErrRequests: errRequestsCounter.WithLabelValues("client"), + serverErrRequests: errRequestsCounter.WithLabelValues("server"), + totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_records_total", + Help: "Number of attempted records (write requests).", + }), + } +} + +// batchingQueueMetrics holds the metrics for the batchingQueue. +type batchingQueueMetrics struct { + flushTotal prometheus.Counter + flushErrorsTotal prometheus.Counter +} + +// newBatchingQueueMetrics creates a new batchingQueueMetrics instance. +func newBatchingQueueMetrics(reg prometheus.Registerer) *batchingQueueMetrics { + return &batchingQueueMetrics{ + flushTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_batching_queue_flush_total", + Help: "Number of times a batch of samples is flushed to the storage.", + }), + flushErrorsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingest_storage_reader_batching_queue_flush_errors_total", + Help: "Number of errors encountered while flushing a batch of samples to the storage.", + }), + } +} diff --git a/pkg/storage/ingest/pusher_test.go b/pkg/storage/ingest/pusher_test.go index 626cc9a1d0b..66820de2484 100644 --- a/pkg/storage/ingest/pusher_test.go +++ b/pkg/storage/ingest/pusher_test.go @@ -6,19 +6,24 @@ import ( "context" "fmt" "strings" + "sync" "testing" + "time" "github.com/go-kit/log" "github.com/gogo/status" - "github.com/grafana/dskit/cancellation" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/tenant" + "github.com/grafana/dskit/user" "github.com/grafana/regexp" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "google.golang.org/grpc/codes" "github.com/grafana/mimir/pkg/mimirpb" @@ -27,6 +32,10 @@ import ( type pusherFunc func(context.Context, *mimirpb.WriteRequest) error +func (p pusherFunc) Close() []error { + return nil +} + func (p pusherFunc) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { return p(ctx, request) } @@ -34,7 +43,7 @@ func (p pusherFunc) PushToStorage(ctx context.Context, request *mimirpb.WriteReq func TestPusherConsumer(t *testing.T) { const tenantID = "t1" writeReqs := []*mimirpb.WriteRequest{ - {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1"), mockPreallocTimeseries("series_2")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1")}}, {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3")}}, {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_4")}}, {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_5")}}, @@ -155,8 +164,8 @@ func TestPusherConsumer(t *testing.T) { expectedWRs: writeReqs[0:3], expErr: "", // since all fof those were client errors, we don't return an error expectedLogLines: []string{ - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = Unknown desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = Unknown desc = ingester test error\"", }, }, "ingester server error": { @@ -174,19 +183,19 @@ func TestPusherConsumer(t *testing.T) { expectedWRs: writeReqs[0:2], // the rest of the requests are not attempted expErr: "ingester internal error", expectedLogLines: []string{ - "method=pusherConsumer.pushToStorage level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" user=t1 insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", + "user=t1 level=warn msg=\"detected a client error while ingesting write request (the request may have been partially ingested)\" insight=true err=\"rpc error: code = InvalidArgument desc = ingester test error\"", }, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - receivedReqs := 0 + receivedReqs := atomic.NewInt64(0) pusher := pusherFunc(func(ctx context.Context, request *mimirpb.WriteRequest) error { - defer func() { receivedReqs++ }() - require.GreaterOrEqualf(t, len(tc.expectedWRs), receivedReqs+1, "received more requests (%d) than expected (%d)", receivedReqs+1, len(tc.expectedWRs)) + reqIdx := int(receivedReqs.Inc() - 1) + require.GreaterOrEqualf(t, len(tc.expectedWRs), reqIdx+1, "received more requests (%d) than expected (%d)", reqIdx+1, len(tc.expectedWRs)) - expectedWR := tc.expectedWRs[receivedReqs] + expectedWR := tc.expectedWRs[reqIdx] for i, ts := range request.Timeseries { assert.Truef(t, ts.Equal(expectedWR.Timeseries[i].TimeSeries), "timeseries %d not equal; got %v, expected %v", i, ts, expectedWR.Timeseries[i].TimeSeries) } @@ -195,12 +204,13 @@ func TestPusherConsumer(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tenantID, actualTenantID) - return tc.responses[receivedReqs].err + return tc.responses[reqIdx].err }) logs := &concurrency.SyncBuffer{} - c := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewLogfmtLogger(logs)) - err := c.consume(context.Background(), tc.records) + metrics := newPusherConsumerMetrics(prometheus.NewPedanticRegistry()) + c := newPusherConsumer(pusher, KafkaConfig{}, metrics, log.NewLogfmtLogger(logs)) + err := c.Consume(context.Background(), tc.records) if tc.expErr == "" { assert.NoError(t, err) } else { @@ -217,19 +227,19 @@ func TestPusherConsumer(t *testing.T) { } } -var unimportantLogFieldsPattern = regexp.MustCompile(`\scaller=\S+\.go:\d+\s`) +var unimportantLogFieldsPattern = regexp.MustCompile(`(\s?)caller=\S+\.go:\d+\s`) func removeUnimportantLogFields(lines []string) []string { // The 'caller' field is not important to these tests (we just care about the message and other information), // and can change as we refactor code, making these tests brittle. So we remove it before making assertions about the log lines. for i, line := range lines { - lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, " ") + lines[i] = unimportantLogFieldsPattern.ReplaceAllString(line, "$1") } return lines } -func TestPusherConsumer_clientErrorSampling(t *testing.T) { +func TestPushErrorHandler_IsServerError(t *testing.T) { type testCase struct { sampler *util_log.Sampler err error @@ -269,7 +279,7 @@ func TestPusherConsumer_clientErrorSampling(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - c := newPusherConsumer(nil, tc.sampler, prometheus.NewPedanticRegistry(), log.NewNopLogger()) + c := newPushErrorHandler(newStoragePusherMetrics(prometheus.NewPedanticRegistry()), tc.sampler, log.NewNopLogger()) sampled, reason := c.shouldLogClientError(context.Background(), tc.err) assert.Equal(t, tc.expectedSampled, sampled) @@ -298,7 +308,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin reg := prometheus.NewPedanticRegistry() logs := &concurrency.SyncBuffer{} - consumer := newPusherConsumer(pusher, nil, reg, log.NewLogfmtLogger(logs)) + consumer := newPusherConsumer(pusher, KafkaConfig{}, newPusherConsumerMetrics(reg), log.NewLogfmtLogger(logs)) return consumer, logs, reg } @@ -308,7 +318,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin consumer, logs, reg := setupTest(pusherErr) // Should return no error on client errors. - require.NoError(t, consumer.consume(context.Background(), []record{reqRecord})) + require.NoError(t, consumer.Consume(context.Background(), []record{reqRecord})) assert.Contains(t, logs.String(), pusherErr.Error()) assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` @@ -330,7 +340,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin consumer, logs, reg := setupTest(pusherErr) // Should return no error on client errors. - require.NoError(t, consumer.consume(context.Background(), []record{reqRecord})) + require.NoError(t, consumer.Consume(context.Background(), []record{reqRecord})) assert.Contains(t, logs.String(), fmt.Sprintf("%s (sampled 1/100)", pusherErr.Error())) assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` @@ -351,7 +361,7 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin consumer, logs, reg := setupTest(pusherErr) // Should return no error on client errors. - require.NoError(t, consumer.consume(context.Background(), []record{reqRecord})) + require.NoError(t, consumer.Consume(context.Background(), []record{reqRecord})) assert.Empty(t, logs.String()) assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` @@ -364,48 +374,768 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin } -func TestPusherConsumer_consume_ShouldHonorContextCancellation(t *testing.T) { - // Create a request that will be used in this test; the content doesn't matter, - // since we only test errors. - req := &mimirpb.WriteRequest{Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1")}} - reqBytes, err := req.Marshal() - require.NoError(t, err) +// ingesterError mimics how the ingester construct errors +func ingesterError(cause mimirpb.ErrorCause, statusCode codes.Code, message string) error { + errorDetails := &mimirpb.ErrorDetails{Cause: cause} + statWithDetails, err := status.New(statusCode, message).WithDetails(errorDetails) + if err != nil { + panic(err) + } + return statWithDetails.Err() +} - reqRecord := record{ - ctx: context.Background(), // The record's context isn't important for the test. - tenantID: "user-1", - content: reqBytes, +type mockPusher struct { + mock.Mock +} + +func (m *mockPusher) PushToStorage(ctx context.Context, request *mimirpb.WriteRequest) error { + args := m.Called(ctx, request) + return args.Error(0) +} + +func TestParallelStorageShards_ShardWriteRequest(t *testing.T) { + testCases := map[string]struct { + shardCount int + batchSize int + requests []*mimirpb.WriteRequest + expectedErrs []error + expectedErrsCount int + + expectedUpstreamPushes []*mimirpb.WriteRequest + upstreamPushErrs []error + expectedCloseErr error + }{ + "push to a single shard and fill exactly capacity": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_1")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_1")}}, + }, + expectedErrs: []error{nil, nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_1"), + mockPreallocTimeseries("series_2_1"), + }}, + }, + upstreamPushErrs: []error{nil}, + expectedCloseErr: nil, + }, + "push to multiple shards and fill exact capacity": { + shardCount: 2, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_2")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_2")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_2")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_4_2")}}, + }, + expectedErrs: []error{nil, nil, nil, nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_2"), + mockPreallocTimeseries("series_3_2"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_2_2"), + mockPreallocTimeseries("series_4_2"), + }}, + }, + upstreamPushErrs: []error{nil, nil}, + expectedCloseErr: nil, + }, + "push to single shard and underfill capacity": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_3")}}, + }, + expectedErrs: []error{nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_3")}}, + }, + upstreamPushErrs: []error{nil}, + expectedCloseErr: nil, + }, + "push to single shard and overfill capacity": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_4")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_4")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_4")}}, + }, + expectedErrs: []error{nil, nil, nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_4"), + mockPreallocTimeseries("series_2_4"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_4"), + }}, + }, + upstreamPushErrs: []error{nil, nil}, + expectedCloseErr: nil, + }, + "push to single shard and overfill only with the series of a singe request": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_5"), + mockPreallocTimeseries("series_2_5"), + mockPreallocTimeseries("series_3_5"), + }}, + }, + expectedErrs: []error{nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_5"), + mockPreallocTimeseries("series_2_5"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_5"), + }}, + }, + upstreamPushErrs: []error{nil, nil}, + expectedCloseErr: nil, + }, + "push to multiple shards and overfill capacity on one shard and underfill on another": { + shardCount: 2, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_6")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_6")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_6")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_6")}}, + }, + + expectedErrs: []error{nil, nil, nil, nil}, + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_6"), + mockPreallocTimeseries("series_3_6"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_2_6"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_6"), + }}, + }, + upstreamPushErrs: []error{nil, nil, nil}, + expectedCloseErr: nil, + }, + "push to single shard and get an error with an underfilled shard (i.e. when calling Close() on the Pusher)": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_7")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_7")}}, + }, + expectedErrs: []error{nil, nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_7"), + mockPreallocTimeseries("series_2_7"), + }}, + }, + upstreamPushErrs: []error{assert.AnError}, + expectedCloseErr: assert.AnError, + }, + "push to single shard and get an error with an overfilled shard (i.e. during some of the pushes)": { + shardCount: 1, + batchSize: 2, + requests: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_1_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_2_8")}}, + + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + {Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseries("series_3_8")}}, + }, + expectedErrsCount: 1, // at least one of those should fail because the first flush failed + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_1_8"), + mockPreallocTimeseries("series_2_8"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_8"), + mockPreallocTimeseries("series_3_8"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_8"), + mockPreallocTimeseries("series_3_8"), + }}, + {Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseries("series_3_8"), + mockPreallocTimeseries("series_3_8"), + }}, + }, + upstreamPushErrs: []error{assert.AnError, nil, nil, nil}, + expectedCloseErr: nil, + }, + + "push with metadata and exemplars": { + shardCount: 1, + batchSize: 3, + requests: []*mimirpb.WriteRequest{ + { + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_1_10")}, + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "series_1_10", + Type: mimirpb.COUNTER, + Help: "A test counter", + Unit: "bytes", + }, + }, + }, + { + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_2_10")}, + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "series_2_10", + Type: mimirpb.GAUGE, + Help: "A test gauge", + Unit: "seconds", + }, + }, + }, + }, + expectedErrs: []error{nil, nil}, + + expectedUpstreamPushes: []*mimirpb.WriteRequest{ + { + Timeseries: []mimirpb.PreallocTimeseries{ + mockPreallocTimeseriesWithExemplar("series_1_10"), + mockPreallocTimeseriesWithExemplar("series_2_10"), + }, + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "series_1_10", + Type: mimirpb.COUNTER, + Help: "A test counter", + Unit: "bytes", + }, + }, + }, + { + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "series_2_10", + Type: mimirpb.GAUGE, + Help: "A test gauge", + Unit: "seconds", + }, + }, + Timeseries: mimirpb.PreallocTimeseriesSliceFromPool(), + }, + }, + upstreamPushErrs: []error{nil, nil}, + expectedCloseErr: nil, + }, } - // didPush signals that the testing record was pushed to the pusher. - didPush := make(chan struct{}, 1) - pusher := pusherFunc(func(ctx context.Context, _ *mimirpb.WriteRequest) error { - close(didPush) - <-ctx.Done() - return context.Cause(ctx) - }) - consumer := newPusherConsumer(pusher, nil, prometheus.NewPedanticRegistry(), log.NewNopLogger()) + for name, tc := range testCases { + tc := tc + t.Run(name, func(t *testing.T) { + require.Equal(t, len(tc.expectedUpstreamPushes), len(tc.upstreamPushErrs)) + if len(tc.expectedErrs) != 0 && tc.expectedErrsCount > 0 { + require.Fail(t, "expectedErrs and expectedErrsCount are mutually exclusive") + } + if len(tc.expectedErrs) != 0 { + require.Equal(t, len(tc.expectedErrs), len(tc.requests)) + } + + pusher := &mockPusher{} + // run with a buffer of one, so some of the tests can fill the buffer and test the error handling + const buffer = 1 + metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) + errorHandler := newPushErrorHandler(metrics, nil, log.NewNopLogger()) + shardingP := newParallelStorageShards(metrics, errorHandler, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash) + + for i, req := range tc.expectedUpstreamPushes { + pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i]) + } + var actualPushErrs []error + for _, req := range tc.requests { + err := shardingP.ShardWriteRequest(context.Background(), req) + actualPushErrs = append(actualPushErrs, err) + } + + if len(tc.expectedErrs) > 0 { + require.Equal(t, tc.expectedErrs, actualPushErrs) + } else { + receivedErrs := 0 + for _, err := range actualPushErrs { + if err != nil { + receivedErrs++ + } + } + require.Equalf(t, tc.expectedErrsCount, receivedErrs, "received %d errors instead of %d: %v", receivedErrs, tc.expectedErrsCount, actualPushErrs) + } + + closeErr := shardingP.Stop() + require.ErrorIs(t, closeErr, tc.expectedCloseErr) + pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes)) + pusher.AssertExpectations(t) + }) + } +} + +func TestParallelStoragePusher(t *testing.T) { + type tenantWriteRequest struct { + tenantID string + *mimirpb.WriteRequest + } + + testCases := map[string]struct { + requests []tenantWriteRequest + expectedUpstreamPushes map[string]map[mimirpb.WriteRequest_SourceEnum]int + }{ + "separate tenants and sources": { + requests: []tenantWriteRequest{ + { + tenantID: "tenant1", + WriteRequest: &mimirpb.WriteRequest{ + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_1")}, + Source: mimirpb.API, + }, + }, + { + tenantID: "tenant1", + WriteRequest: &mimirpb.WriteRequest{ + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_2")}, + Source: mimirpb.RULE, + }, + }, + { + tenantID: "tenant2", + WriteRequest: &mimirpb.WriteRequest{ + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_3")}, + Source: mimirpb.API, + }, + }, + { + tenantID: "tenant2", + WriteRequest: &mimirpb.WriteRequest{ + Timeseries: []mimirpb.PreallocTimeseries{mockPreallocTimeseriesWithExemplar("series_4")}, + Source: mimirpb.RULE, + }, + }, + }, + expectedUpstreamPushes: map[string]map[mimirpb.WriteRequest_SourceEnum]int{ + "tenant1": { + mimirpb.API: 1, + mimirpb.RULE: 1, + }, + "tenant2": { + mimirpb.API: 1, + mimirpb.RULE: 1, + }, + }, + }, + "metadata-only requests": { + requests: []tenantWriteRequest{ + { + tenantID: "tenant1", + WriteRequest: &mimirpb.WriteRequest{ + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "metric1", + Type: mimirpb.COUNTER, + Help: "A test counter", + Unit: "bytes", + }, + }, + Source: mimirpb.API, + }, + }, + { + tenantID: "tenant1", + WriteRequest: &mimirpb.WriteRequest{ + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "metric2", + Type: mimirpb.GAUGE, + Help: "A test gauge", + Unit: "seconds", + }, + }, + Source: mimirpb.RULE, + }, + }, + { + tenantID: "tenant2", + WriteRequest: &mimirpb.WriteRequest{ + Metadata: []*mimirpb.MetricMetadata{ + { + MetricFamilyName: "metric3", + Type: mimirpb.HISTOGRAM, + Help: "A test histogram", + Unit: "bytes", + }, + }, + Source: mimirpb.API, + }, + }, + }, + expectedUpstreamPushes: map[string]map[mimirpb.WriteRequest_SourceEnum]int{ + "tenant1": { + mimirpb.API: 1, + mimirpb.RULE: 1, + }, + "tenant2": { + mimirpb.API: 1, + }, + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + pusher := &mockPusher{} + logger := log.NewNopLogger() - wantCancelErr := cancellation.NewErrorf("stop") + receivedPushes := make(map[string]map[mimirpb.WriteRequest_SourceEnum]int) + var receivedPushesMu sync.Mutex - // For this test, cancelling the top-most context must cancel an in-flight call to push, - // to prevent pusher from hanging forever. - canceledCtx, cancel := context.WithCancelCause(context.Background()) + pusher.On("PushToStorage", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + tenantID, err := tenant.TenantID(args.Get(0).(context.Context)) + require.NoError(t, err) + req := args.Get(1).(*mimirpb.WriteRequest) + + receivedPushesMu.Lock() + defer receivedPushesMu.Unlock() + + if receivedPushes[tenantID] == nil { + receivedPushes[tenantID] = make(map[mimirpb.WriteRequest_SourceEnum]int) + } + receivedPushes[tenantID][req.Source]++ + }).Return(nil) + + metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry()) + psp := newParallelStoragePusher(metrics, pusher, 0, 1, 1, logger) + + // Process requests + for _, req := range tc.requests { + ctx := user.InjectOrgID(context.Background(), req.tenantID) + err := psp.PushToStorage(ctx, req.WriteRequest) + require.NoError(t, err) + } + + // Close the pusher to flush any remaining data + errs := psp.Close() + require.Empty(t, errs) + + // Verify the received pushes + assert.Equal(t, tc.expectedUpstreamPushes, receivedPushes, "Mismatch in upstream pushes") + + pusher.AssertExpectations(t) + }) + } +} + +func TestBatchingQueue_NoDeadlock(t *testing.T) { + capacity := 2 + batchSize := 3 + reg := prometheus.NewPedanticRegistry() + m := newBatchingQueueMetrics(reg) + queue := newBatchingQueue(capacity, batchSize, m) + + ctx := context.Background() + series := mockPreallocTimeseries("series_1") + + // Start a goroutine to process the queue + var wg sync.WaitGroup + wg.Add(1) go func() { - <-didPush - cancel(wantCancelErr) + defer wg.Done() + defer queue.Done() + for range queue.Channel() { + // Simulate processing time + time.Sleep(50 * time.Millisecond) + queue.ErrorChannel() <- fmt.Errorf("mock error") + } }() - err = consumer.consume(canceledCtx, []record{reqRecord}) - require.ErrorIs(t, err, wantCancelErr) + // Add items to the queue + for i := 0; i < batchSize*(capacity+1); i++ { + require.NoError(t, queue.AddToBatch(ctx, mimirpb.API, series)) + } + + // Close the queue to signal no more items will be added + err := queue.Close() + require.ErrorContains(t, err, "mock error") + + wg.Wait() + + // Ensure the queue is empty and no deadlock occurred + require.Len(t, queue.ch, 0) + require.Len(t, queue.errCh, 0) + require.Len(t, queue.currentBatch.Timeseries, 0) + + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP cortex_ingest_storage_reader_batching_queue_flush_errors_total Number of errors encountered while flushing a batch of samples to the storage. +# TYPE cortex_ingest_storage_reader_batching_queue_flush_errors_total counter +cortex_ingest_storage_reader_batching_queue_flush_errors_total 0 +# HELP cortex_ingest_storage_reader_batching_queue_flush_total Number of times a batch of samples is flushed to the storage. +# TYPE cortex_ingest_storage_reader_batching_queue_flush_total counter +cortex_ingest_storage_reader_batching_queue_flush_total 3 +`))) } -// ingesterError mimics how the ingester construct errors -func ingesterError(cause mimirpb.ErrorCause, statusCode codes.Code, message string) error { - errorDetails := &mimirpb.ErrorDetails{Cause: cause} - statWithDetails, err := status.New(statusCode, message).WithDetails(errorDetails) - if err != nil { - panic(err) +func TestBatchingQueue(t *testing.T) { + capacity := 5 + batchSize := 3 + + series1 := mockPreallocTimeseries("series_1") + series2 := mockPreallocTimeseries("series_2") + + series := []mimirpb.PreallocTimeseries{series1, series2} + + t.Run("batch not flushed because batch size is 3 and we have 2 items in the queue", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + case <-time.After(100 * time.Millisecond): + } + }) + + t.Run("batch flushed because batch size is 3 and we have 3 items in the queue", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + series3 := mockPreallocTimeseries("series_3") + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, series3)) + + select { + case batch := <-queue.Channel(): + require.Len(t, batch.WriteRequest.Timeseries, 3) + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + require.Equal(t, series3, batch.WriteRequest.Timeseries[2]) + case <-time.After(time.Second): + t.Fatal("expected batch to be flushed") + } + + // after the batch is flushed, the queue should be empty. + require.Len(t, queue.currentBatch.Timeseries, 0) + }) + + t.Run("if you close the queue with items in the queue, the queue should flush the items", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, series) + + // Channel is empty. + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + case <-time.After(100 * time.Millisecond): + } + + // Read in a separate goroutine as when we close the queue, the channel will be closed. + var batch flushableWriteRequest + go func() { + defer queue.Done() + for b := range queue.Channel() { + batch = b + queue.ErrorChannel() <- nil + } + }() + + // Close the queue, and the items should be flushed. + require.NoError(t, queue.Close()) + + require.Len(t, batch.WriteRequest.Timeseries, 2) + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + }) + + t.Run("test queue capacity", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + + // Queue channel is empty because there are only 2 items in the current currentBatch. + require.Len(t, queue.ch, 0) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Add items to the queue until it's full. + for i := 0; i < capacity*batchSize; i++ { + s := mockPreallocTimeseries(fmt.Sprintf("series_%d", i)) + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s)) + } + + // We should have 5 items in the queue channel and 0 items in the currentBatch. + require.Len(t, queue.ch, 5) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Read one item to free up a queue space. + batch := <-queue.Channel() + require.Len(t, batch.WriteRequest.Timeseries, 3) + + // Queue should have 4 items now and the currentBatch remains the same. + require.Len(t, queue.ch, 4) + require.Len(t, queue.currentBatch.Timeseries, 0) + + // Add three more items to fill up the queue again, this shouldn't block. + s := mockPreallocTimeseries("series_100") + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s)) + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s)) + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s)) + + require.Len(t, queue.ch, 5) + require.Len(t, queue.currentBatch.Timeseries, 0) + }) + + t.Run("metadata and exemplars are preserved in batches", func(t *testing.T) { + const ( + capacity = 2 + batchSize = 2 // 1 for metadata, 1 for a series + timestamp = 1234567890 + ) + queue := setupQueue(t, capacity, batchSize, nil) + + // Create a WriteRequest with metadata and a series with exemplars + timeSeries := mockPreallocTimeseries("series_1") + timeSeries.Exemplars = append(timeSeries.Exemplars, mimirpb.Exemplar{ + Value: 42.0, + TimestampMs: timestamp, + Labels: []mimirpb.LabelAdapter{{Name: "trace_id", Value: "abc123"}}, + }, + ) + + md := &mimirpb.MetricMetadata{ + Type: mimirpb.COUNTER, + MetricFamilyName: "test_counter", + Help: "A test counter", + Unit: "bytes", + } + + // Add timeseries with exemplars to the queue + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, timeSeries)) + + // Add metadata to the queue + require.NoError(t, queue.AddMetadataToBatch(context.Background(), mimirpb.API, md)) + + // Read the batch from the queue + select { + case batch, notExhausted := <-queue.Channel(): + require.True(t, notExhausted) + require.Len(t, batch.WriteRequest.Timeseries, 1) + require.Len(t, batch.WriteRequest.Metadata, 1) + + // Check that the first series in the batch has the correct exemplars + require.Len(t, batch.WriteRequest.Timeseries[0].TimeSeries.Exemplars, 1) + require.Equal(t, 42.0, batch.WriteRequest.Timeseries[0].TimeSeries.Exemplars[0].Value) + require.Equal(t, int64(timestamp), batch.WriteRequest.Timeseries[0].TimeSeries.Exemplars[0].TimestampMs) + require.Equal(t, "trace_id", batch.WriteRequest.Timeseries[0].TimeSeries.Exemplars[0].Labels[0].Name) + require.Equal(t, "abc123", batch.WriteRequest.Timeseries[0].TimeSeries.Exemplars[0].Labels[0].Value) + + // Check that the metadata in the batch is correct + require.Equal(t, mimirpb.COUNTER, batch.WriteRequest.Metadata[0].Type) + require.Equal(t, "test_counter", batch.WriteRequest.Metadata[0].MetricFamilyName) + require.Equal(t, "A test counter", batch.WriteRequest.Metadata[0].Help) + require.Equal(t, "bytes", batch.WriteRequest.Metadata[0].Unit) + + case <-time.After(time.Second): + t.Fatal("expected batch to be flushed") + } + }) +} + +func TestBatchingQueue_ErrorHandling(t *testing.T) { + capacity := 2 + batchSize := 2 + series1 := mockPreallocTimeseries("series_1") + series2 := mockPreallocTimeseries("series_2") + + t.Run("AddToBatch returns all errors and it pushes the batch when the batch is filled ", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Push 1 series so that the next push will complete the batch. + require.NoError(t, queue.AddToBatch(ctx, mimirpb.API, series2)) + + // Push an error to fill the error channel. + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // AddToBatch should return an error now. + err := queue.AddToBatch(ctx, mimirpb.API, series2) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + // Also the batch was pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series2, batch.WriteRequest.Timeseries[0]) + require.Equal(t, series2, batch.WriteRequest.Timeseries[1]) + default: + t.Fatal("expected batch to be flushed") + } + + // AddToBatch should work again. + require.NoError(t, queue.AddToBatch(ctx, mimirpb.API, series2)) + require.NoError(t, queue.AddToBatch(ctx, mimirpb.API, series2)) + }) + + t.Run("Any errors pushed after last AddToBatch call are received on Close", func(t *testing.T) { + queue := setupQueue(t, capacity, batchSize, nil) + ctx := context.Background() + + // Add a batch to a batch but make sure nothing is pushed., + require.NoError(t, queue.AddToBatch(ctx, mimirpb.API, series1)) + + select { + case <-queue.Channel(): + t.Fatal("expected batch to not be flushed") + default: + } + + // Push multiple errors + queue.ErrorChannel() <- fmt.Errorf("mock error 1") + queue.ErrorChannel() <- fmt.Errorf("mock error 2") + + // Close and Done on the queue. + queue.Done() + err := queue.Close() + require.Error(t, err) + assert.Equal(t, "2 errors: mock error 1; mock error 2", err.Error()) + + // Batch is also pushed. + select { + case batch := <-queue.Channel(): + require.Equal(t, series1, batch.WriteRequest.Timeseries[0]) + default: + t.Fatal("expected batch to be flushed") + } + }) +} + +func setupQueue(t *testing.T, capacity, batchSize int, series []mimirpb.PreallocTimeseries) *batchingQueue { + t.Helper() + + reg := prometheus.NewPedanticRegistry() + m := newBatchingQueueMetrics(reg) + queue := newBatchingQueue(capacity, batchSize, m) + + for _, s := range series { + require.NoError(t, queue.AddToBatch(context.Background(), mimirpb.API, s)) } - return statWithDetails.Err() + + return queue } diff --git a/pkg/storage/ingest/reader.go b/pkg/storage/ingest/reader.go index 1255b6d380b..b6744557f66 100644 --- a/pkg/storage/ingest/reader.go +++ b/pkg/storage/ingest/reader.go @@ -24,7 +24,6 @@ import ( "github.com/twmb/franz-go/plugin/kprom" "go.uber.org/atomic" - util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -58,7 +57,17 @@ type recordConsumer interface { // Consume consumes the given records in the order they are provided. We need this as samples that will be ingested, // are also needed to be in order to avoid ingesting samples out of order. // The function is expected to be idempotent and incremental, meaning that it can be called multiple times with the same records, and it won't respond to context cancellation. - consume(context.Context, []record) error + Consume(context.Context, []record) error +} + +type consumerFactory interface { + consumer() recordConsumer +} + +type consumerFactoryFunc func() recordConsumer + +func (c consumerFactoryFunc) consumer() recordConsumer { + return c() } type PartitionReader struct { @@ -73,8 +82,8 @@ type PartitionReader struct { client *kgo.Client fetcher fetcher - consumer recordConsumer - metrics readerMetrics + newConsumer consumerFactory + metrics readerMetrics committer *partitionCommitter @@ -88,15 +97,18 @@ type PartitionReader struct { } func NewPartitionReaderForPusher(kafkaCfg KafkaConfig, partitionID int32, instanceID string, pusher Pusher, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { - consumer := newPusherConsumer(pusher, util_log.NewSampler(kafkaCfg.FallbackClientErrorSampleRate), reg, logger) - return newPartitionReader(kafkaCfg, partitionID, instanceID, consumer, logger, reg) + metrics := newPusherConsumerMetrics(reg) + factory := consumerFactoryFunc(func() recordConsumer { + return newPusherConsumer(pusher, kafkaCfg, metrics, logger) + }) + return newPartitionReader(kafkaCfg, partitionID, instanceID, factory, logger, reg) } -func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID string, consumer recordConsumer, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { +func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID string, consumer consumerFactory, logger log.Logger, reg prometheus.Registerer) (*PartitionReader, error) { r := &PartitionReader{ kafkaCfg: kafkaCfg, partitionID: partitionID, - consumer: consumer, + newConsumer: consumer, consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID), metrics: newReaderMetrics(partitionID, reg), consumedOffsetWatcher: newPartitionOffsetWatcher(), @@ -478,16 +490,18 @@ func (r *PartitionReader) consumeFetches(ctx context.Context, fetches kgo.Fetche logger := spanlogger.FromContext(ctx, r.logger) for boff.Ongoing() { + // We instantiate the consumer on each iteration because it is stateful, and we can't reuse it after closing. + consumer := r.newConsumer.consumer() // If the PartitionReader is stopping and the ctx was cancelled, we don't want to interrupt the in-flight // processing midway. Instead, we let it finish, assuming it'll succeed. // If the processing fails while stopping, we log the error and let the backoff stop and bail out. // There is an edge-case when the processing gets stuck and doesn't let the stopping process. In such a case, // we expect the infrastructure (e.g. k8s) to eventually kill the process. consumeCtx := context.WithoutCancel(ctx) - err := r.consumer.consume(consumeCtx, records) + err := consumer.Consume(consumeCtx, records) if err == nil { level.Debug(logger).Log("msg", "closing consumer after successful consumption") - return nil + break } level.Error(logger).Log( "msg", "encountered error while ingesting data from Kafka; should retry", diff --git a/pkg/storage/ingest/reader_test.go b/pkg/storage/ingest/reader_test.go index 0e1facd726a..a9daf464de4 100644 --- a/pkg/storage/ingest/reader_test.go +++ b/pkg/storage/ingest/reader_test.go @@ -121,7 +121,7 @@ func TestPartitionReader_ConsumerError(t *testing.T) { consumer := consumerFunc(func(ctx context.Context, records []record) error { invocations.Inc() if !returnErrors.Load() { - return trackingConsumer.consume(ctx, records) + return trackingConsumer.Consume(ctx, records) } // There may be more records, but we only care that the one we failed to consume in the first place is still there. assert.Equal(t, "1", string(records[0].content)) @@ -1770,7 +1770,7 @@ func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, t type readerTestCfg struct { kafka KafkaConfig partitionID int32 - consumer recordConsumer + consumer consumerFactory registry *prometheus.Registry logger log.Logger } @@ -1835,7 +1835,9 @@ func defaultReaderTestConfig(t *testing.T, addr string, topicName string, partit logger: testingLogger.WithT(t), kafka: createTestKafkaConfig(addr, topicName), partitionID: partitionID, - consumer: consumer, + consumer: consumerFactoryFunc(func() recordConsumer { + return consumer + }), } } @@ -1981,7 +1983,7 @@ func newTestConsumer(capacity int) testConsumer { } } -func (t testConsumer) consume(ctx context.Context, records []record) error { +func (t testConsumer) Consume(ctx context.Context, records []record) error { for _, r := range records { select { case <-ctx.Done(): @@ -2022,7 +2024,7 @@ func (t testConsumer) waitRecords(numRecords int, waitTimeout, drainPeriod time. type consumerFunc func(ctx context.Context, records []record) error -func (c consumerFunc) consume(ctx context.Context, records []record) error { +func (c consumerFunc) Consume(ctx context.Context, records []record) error { return c(ctx, records) } diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index ce904e644bf..ebf27fff98d 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -1036,6 +1036,27 @@ func mockPreallocTimeseries(metricName string) mimirpb.PreallocTimeseries { } } +func mockPreallocTimeseriesWithExemplar(metricName string) mimirpb.PreallocTimeseries { + return mimirpb.PreallocTimeseries{ + TimeSeries: &mimirpb.TimeSeries{ + Labels: []mimirpb.LabelAdapter{ + {Name: "__name__", Value: metricName}, + }, + Samples: []mimirpb.Sample{{ + TimestampMs: 1, + Value: 2, + }}, + Exemplars: []mimirpb.Exemplar{{ + TimestampMs: 2, + Value: 14, + Labels: []mimirpb.LabelAdapter{ + {Name: "trace_id", Value: metricName + "_trace"}, + }, + }}, + }, + } +} + func getProduceRequestRecordsCount(req *kmsg.ProduceRequest) (int, error) { count := 0