Skip to content

Commit

Permalink
Enhance cortex_ingester_inflight_push_requests metric (#6437)
Browse files Browse the repository at this point in the history
Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Dec 18, 2024
1 parent 27d68c2 commit d6f0d23
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
17 changes: 10 additions & 7 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,9 @@ type Ingester struct {
TSDBState TSDBState

// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
ingestionRate *util_math.EwmaRate
inflightPushRequests atomic.Int64
maxInflightPushRequests util_math.MaxTracker

inflightQueryRequests atomic.Int64
maxInflightQueryRequests util_math.MaxTracker
Expand Down Expand Up @@ -710,7 +711,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
cfg.ActiveSeriesMetricsEnabled,
i.getInstanceLimits,
i.ingestionRate,
&i.inflightPushRequests,
&i.maxInflightPushRequests,
&i.maxInflightQueryRequests,
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled)
i.validateMetrics = validation.NewValidateMetrics(registerer)
Expand Down Expand Up @@ -792,7 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
false,
i.getInstanceLimits,
nil,
&i.inflightPushRequests,
&i.maxInflightPushRequests,
&i.maxInflightQueryRequests,
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled,
)
Expand Down Expand Up @@ -918,8 +919,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error {
metadataPurgeTicker := time.NewTicker(metadataPurgePeriod)
defer metadataPurgeTicker.Stop()

maxInflightRequestResetTicker := time.NewTicker(maxInflightRequestResetPeriod)
defer maxInflightRequestResetTicker.Stop()
maxTrackerResetTicker := time.NewTicker(maxInflightRequestResetPeriod)
defer maxTrackerResetTicker.Stop()

for {
select {
Expand All @@ -937,8 +938,9 @@ func (i *Ingester) updateLoop(ctx context.Context) error {

case <-activeSeriesTickerChan:
i.updateActiveSeries(ctx)
case <-maxInflightRequestResetTicker.C:
case <-maxTrackerResetTicker.C:
i.maxInflightQueryRequests.Tick()
i.maxInflightPushRequests.Tick()
case <-userTSDBConfigTicker.C:
i.updateUserTSDBConfigs()
case <-ctx.Done():
Expand Down Expand Up @@ -1068,6 +1070,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
i.maxInflightPushRequests.Track(inflight)
defer i.inflightPushRequests.Dec()

gl := i.getInstanceLimits()
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -53,10 +52,12 @@ type ingesterMetrics struct {
maxUsersGauge prometheus.GaugeFunc
maxSeriesGauge prometheus.GaugeFunc
maxIngestionRate prometheus.GaugeFunc
ingestionRate prometheus.GaugeFunc
maxInflightPushRequests prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc

// Current Usage
ingestionRate prometheus.GaugeFunc
inflightRequests prometheus.GaugeFunc
inflightQueryRequests prometheus.GaugeFunc

// Posting Cache Metrics
expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics
Expand All @@ -67,7 +68,7 @@ func newIngesterMetrics(r prometheus.Registerer,
activeSeriesEnabled bool,
instanceLimitsFn func() *InstanceLimits,
ingestionRate *util_math.EwmaRate,
inflightPushRequests *atomic.Int64,
inflightPushRequests *util_math.MaxTracker,
maxInflightQueryRequests *util_math.MaxTracker,
postingsCacheEnabled bool,
) *ingesterMetrics {
Expand Down
7 changes: 3 additions & 4 deletions pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

util_math "github.com/cortexproject/cortex/pkg/util/math"
)

func TestIngesterMetrics(t *testing.T) {
mainReg := prometheus.NewPedanticRegistry()
ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval)
inflightPushRequests := &atomic.Int64{}
inflightPushRequests := util_math.MaxTracker{}
maxInflightQueryRequests := util_math.MaxTracker{}
maxInflightQueryRequests.Track(98)
inflightPushRequests.Store(14)
inflightPushRequests.Track(14)

m := newIngesterMetrics(mainReg,
false,
Expand All @@ -33,7 +32,7 @@ func TestIngesterMetrics(t *testing.T) {
}
},
ingestionRate,
inflightPushRequests,
&inflightPushRequests,
&maxInflightQueryRequests,
false)

Expand Down

0 comments on commit d6f0d23

Please sign in to comment.