diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cd7776c55..64fa3fd564 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619 * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735 +* [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 30e4d799d5..49d9446d68 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -583,21 +583,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co inflight := d.inflightPushRequests.Inc() defer d.inflightPushRequests.Dec() - if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { - return nil, errTooManyInflightPushRequests - } - - if d.cfg.InstanceLimits.MaxIngestionRate > 0 { - if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { - return nil, errMaxSamplesPushRateLimitReached - } - } - now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) - removeReplica := false - numSamples := 0 numExemplars := 0 for _, ts := range req.Timeseries { @@ -610,6 +598,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) + if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { + return nil, errTooManyInflightPushRequests + } + + if d.cfg.InstanceLimits.MaxIngestionRate > 0 { + if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { + return nil, errMaxSamplesPushRateLimitReached + } + } + + removeReplica := false // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID)