Skip to content

Commit

Permalink
fix distributor sends native histograms
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Nov 27, 2023
1 parent 72ba1d5 commit c37a3f7
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
29 changes: 22 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_metadata_in_total",
Help: "The total number of metadata the have come in to the distributor, including rejected.",
Help: "The total number of metadata that have come in to the distributor, including rejected.",
}, []string{"user"}),
nonHASamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Expand Down Expand Up @@ -538,7 +538,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
for _, e := range ts.Exemplars {
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
// An exemplar validation error prevents ingesting samples
// in the same series object. However because the current Prometheus
// in the same series object. However, because the current Prometheus
// remote write implementation only populates one or the other,
// there never will be any.
return emptyPreallocSeries, err
Expand All @@ -547,11 +547,23 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
}
}

var histograms []cortexpb.Histogram
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
// TODO(yeya24): we need to have validations for native histograms
// at some point. Skip validations for now.
for _, h := range ts.Histograms {
histograms = append(histograms, h)
}
}

return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: ts.Labels,
Samples: samples,
Exemplars: exemplars,
Labels: ts.Labels,
Samples: samples,
Exemplars: exemplars,
Histograms: histograms,
},
},
nil
Expand Down Expand Up @@ -589,10 +601,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
numSamples := 0
numExemplars := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples)
// Should we differentiate normal sample and histogram sample?
numSamples += len(ts.Samples) + len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
// Count the total number of metadata in.
Expand Down Expand Up @@ -772,6 +785,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
if len(ts.Samples) > 0 {
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}
// TODO: use timestamp of the latest native histogram in the series as well.

if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
Expand Down Expand Up @@ -836,6 +850,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write

seriesKeys = append(seriesKeys, key)
validatedTimeseries = append(validatedTimeseries, validatedSeries)
// TODO(yeya24): add histogram samples as well when supported.
validatedSamples += len(ts.Samples)
validatedExemplars += len(ts.Exemplars)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="userA"} 1111
# HELP cortex_distributor_metadata_in_total The total number of metadata the have come in to the distributor, including rejected.
# HELP cortex_distributor_metadata_in_total The total number of metadata that have come in to the distributor, including rejected.
# TYPE cortex_distributor_metadata_in_total counter
cortex_distributor_metadata_in_total{user="userA"} 5
Expand Down

0 comments on commit c37a3f7

Please sign in to comment.