Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix distributor sends native histograms #5679

Merged
merged 1 commit into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_metadata_in_total",
Help: "The total number of metadata the have come in to the distributor, including rejected.",
Help: "The total number of metadata that have come in to the distributor, including rejected.",
}, []string{"user"}),
nonHASamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -538,7 +538,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
for _, e := range ts.Exemplars {
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
// An exemplar validation error prevents ingesting samples
// in the same series object. However because the current Prometheus
// in the same series object. However, because the current Prometheus
// remote write implementation only populates one or the other,
// there never will be any.
return emptyPreallocSeries, err
Expand All @@ -547,11 +547,23 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
}
}

var histograms []cortexpb.Histogram
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
// TODO(yeya24): we need to have validations for native histograms
// at some point. Skip validations for now.
for _, h := range ts.Histograms {
histograms = append(histograms, h)
}
}

return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: ts.Labels,
Samples: samples,
Exemplars: exemplars,
Labels: ts.Labels,
Samples: samples,
Exemplars: exemplars,
Histograms: histograms,
},
},
nil
Expand Down Expand Up @@ -589,10 +601,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
numSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples)
numSamples += len(ts.Samples) + len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
Expand Down Expand Up @@ -772,6 +784,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
if len(ts.Samples) > 0 {
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}
// TODO(yeya24): use timestamp of the latest native histogram in the series as well.

if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
Expand Down Expand Up @@ -836,6 +849,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
// TODO(yeya24): add histogram samples as well when supported.
validatedSamples += len(ts.Samples)
validatedExemplars += len(ts.Exemplars)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="userA"} 1111

# HELP cortex_distributor_metadata_in_total The total number of metadata the have come in to the distributor, including rejected.
# HELP cortex_distributor_metadata_in_total The total number of metadata that have come in to the distributor, including rejected.
# TYPE cortex_distributor_metadata_in_total counter
cortex_distributor_metadata_in_total{user="userA"} 5

Expand Down
Loading