Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka replay speed: rename existing cortex_ingest_storage_reader_records_* metrics #9654

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,12 @@ spec:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1117,7 +1122,12 @@ spec:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin-compiled-baremetal/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,12 @@ groups:
message: Mimir {{ $labels.instance }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, instance) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, instance) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1091,7 +1096,12 @@ groups:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, instance) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, instance) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, instance) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin-compiled/alerts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,12 @@ groups:
message: Mimir {{ $labels.pod }} in {{ $labels.cluster }}/{{ $labels.namespace }} fails to consume write requests read from Kafka due to internal errors.
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterfailstoprocessrecordsfromkafka
expr: |
sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
for: 5m
labels:
severity: critical
Expand All @@ -1105,7 +1110,12 @@ groups:
runbook_url: https://grafana.com/docs/mimir/latest/operators-guide/mimir-runbooks/#mimiringesterstuckprocessingrecordsfromkafka
expr: |
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (cluster, namespace, pod) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (cluster, namespace, pod) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (cluster, namespace, pod) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
14 changes: 12 additions & 2 deletions operations/mimir-mixin/alerts/ingest-storage.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@
alert: $.alertName('IngesterFailsToProcessRecordsFromKafka'),
'for': '5m',
expr: |||
sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])) > 0
sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{cause="server"}[1m])
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
or
rate(cortex_ingest_storage_reader_requests_failed_total{cause="server"}[1m])
) > 0
||| % $._config,
labels: {
severity: 'critical',
Expand All @@ -139,7 +144,12 @@
'for': '5m',
expr: |||
# Alert if the reader is not processing any records, but there buffered records to process in the Kafka client.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (rate(cortex_ingest_storage_reader_records_total[5m])) == 0)
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total[5m])
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
or
rate(cortex_ingest_storage_reader_requests_total[5m])
) == 0)
and
# NOTE: the cortex_ingest_storage_reader_buffered_fetch_records_total metric is a gauge showing the current number of buffered records.
(sum by (%(alert_aggregation_labels)s, %(per_instance_label)s) (cortex_ingest_storage_reader_buffered_fetch_records_total) > 0)
Expand Down
30 changes: 25 additions & 5 deletions operations/mimir-mixin/dashboards/writes.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,32 @@ local filename = 'mimir-writes.json';
$.queryPanel(
[
|||
sum(rate(cortex_ingest_storage_reader_records_total{%s}[$__rate_interval]))
sum(
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_total{%s}[$__rate_interval])
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
or
rate(cortex_ingest_storage_reader_requests_total{%s}[$__rate_interval])
)
-
sum(rate(cortex_ingest_storage_reader_records_failed_total{%s}[$__rate_interval]))
||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (rate (cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
'sum (rate (cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval]))' % [$.jobMatcher($._config.job_names.ingester)],
sum(
# This is the old metric name. We're keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s}[$__rate_interval])
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s}[$__rate_interval])
)
||| % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="client"}[$__rate_interval])
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="client"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
'sum (
# This is the old metric name. We\'re keeping support for backward compatibility.
rate(cortex_ingest_storage_reader_records_failed_total{%s, cause="server"}[$__rate_interval])
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
or
rate(cortex_ingest_storage_reader_requests_failed_total{%s, cause="server"}[$__rate_interval])
)' % [$.jobMatcher($._config.job_names.ingester), $.jobMatcher($._config.job_names.ingester)],
],
[
'successful',
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (p *parallelStorageShards) run(queue *batchingQueue) {
// The error handler needs to determine if this is a server error or not.
// If it is, we need to stop processing as the batch will be retried. When is not (client error), it'll log it, and we can continue processing.
p.metrics.processingTime.WithLabelValues(requestContents(wr.WriteRequest)).Observe(time.Since(processingStart).Seconds())
if err != nil && p.errorHandler.IsServerError(wr.Context, err) {
if p.errorHandler.IsServerError(wr.Context, err) {
queue.ErrorChannel() <- err
}
}
Expand Down Expand Up @@ -433,10 +433,10 @@ func (p *pushErrorHandler) IsServerError(ctx context.Context, err error) bool {
// For the sake of simplicity, let's increment the total requests counter here.
p.metrics.totalRequests.Inc()

spanLog := spanlogger.FromContext(ctx, p.fallbackLogger)
if err == nil {
return false
}
spanLog := spanlogger.FromContext(ctx, p.fallbackLogger)

// Only return non-client errors; these will stop the processing of the current Kafka fetches and retry (possibly).
if !mimirpb.IsClientError(err) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/ingest/pusher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type storagePusherMetrics struct {
// newStoragePusherMetrics creates a new storagePusherMetrics instance.
func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics {
errRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_failed_total",
Help: "Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
Name: "cortex_ingest_storage_reader_requests_failed_total",
Help: "Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.",
}, []string{"cause"})

return &storagePusherMetrics{
Expand All @@ -70,8 +70,8 @@ func newStoragePusherMetrics(reg prometheus.Registerer) *storagePusherMetrics {
clientErrRequests: errRequestsCounter.WithLabelValues("client"),
serverErrRequests: errRequestsCounter.WithLabelValues("server"),
totalRequests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_reader_records_total",
Help: "Number of attempted records (write requests).",
Name: "cortex_ingest_storage_reader_requests_total",
Help: "Number of attempted write requests after batching records from Kafka.",
}),
}
}
Expand Down
54 changes: 37 additions & 17 deletions pkg/storage/ingest/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Contains(t, logs.String(), pusherErr.Error())
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

t.Run("should log a client error if does implement optional logging interface and ShouldLog() returns true", func(t *testing.T) {
Expand All @@ -344,11 +344,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Contains(t, logs.String(), fmt.Sprintf("%s (sampled 1/100)", pusherErr.Error()))
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

t.Run("should not log a client error if does implement optional logging interface and ShouldLog() returns false", func(t *testing.T) {
Expand All @@ -365,11 +365,11 @@ func TestPusherConsumer_consume_ShouldLogErrorsHonoringOptionalLogging(t *testin

assert.Empty(t, logs.String())
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ingest_storage_reader_records_failed_total Number of records (write requests) which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_records_failed_total counter
cortex_ingest_storage_reader_records_failed_total{cause="client"} 1
cortex_ingest_storage_reader_records_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_records_failed_total"))
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 1
cortex_ingest_storage_reader_requests_failed_total{cause="server"} 0
`), "cortex_ingest_storage_reader_requests_failed_total"))
})

}
Expand Down Expand Up @@ -666,12 +666,18 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
pusher := &mockPusher{}
// run with a buffer of one, so some of the tests can fill the buffer and test the error handling
const buffer = 1
metrics := newStoragePusherMetrics(prometheus.NewPedanticRegistry())
reg := prometheus.NewPedanticRegistry()
metrics := newStoragePusherMetrics(reg)
errorHandler := newPushErrorHandler(metrics, nil, log.NewNopLogger())
shardingP := newParallelStorageShards(metrics, errorHandler, tc.shardCount, tc.batchSize, buffer, pusher, labels.StableHash)

upstreamPushErrsCount := 0
for i, req := range tc.expectedUpstreamPushes {
pusher.On("PushToStorage", mock.Anything, req).Return(tc.upstreamPushErrs[i])
err := tc.upstreamPushErrs[i]
pusher.On("PushToStorage", mock.Anything, req).Return(err)
if err != nil {
upstreamPushErrsCount++
}
}
var actualPushErrs []error
for _, req := range tc.requests {
Expand All @@ -695,6 +701,20 @@ func TestParallelStorageShards_ShardWriteRequest(t *testing.T) {
require.ErrorIs(t, closeErr, tc.expectedCloseErr)
pusher.AssertNumberOfCalls(t, "PushToStorage", len(tc.expectedUpstreamPushes))
pusher.AssertExpectations(t)

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingest_storage_reader_requests_total Number of attempted write requests after batching records from Kafka.
# TYPE cortex_ingest_storage_reader_requests_total counter
cortex_ingest_storage_reader_requests_total %d
# HELP cortex_ingest_storage_reader_requests_failed_total Number of write requests which caused errors while processing. Client errors are errors such as tenant limits and samples out of bounds. Server errors indicate internal recoverable errors.
# TYPE cortex_ingest_storage_reader_requests_failed_total counter
cortex_ingest_storage_reader_requests_failed_total{cause="server"} %d
cortex_ingest_storage_reader_requests_failed_total{cause="client"} 0
`, len(tc.expectedUpstreamPushes), upstreamPushErrsCount)),
"cortex_ingest_storage_reader_requests_total",
"cortex_ingest_storage_reader_requests_failed_total",
),
)
})
}
}
Expand Down
Loading