Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prom-to-sd: Prefer proto in content type #739

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions prometheus-to-sd/translator/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package translator

import (
"bytes"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
"net/http"
"strings"
"time"
Expand All @@ -34,7 +35,8 @@ const customMetricsPrefix = "custom.googleapis.com"

// PrometheusResponse represents unprocessed response from Prometheus endpoint.
type PrometheusResponse struct {
rawResponse string
rawResponse []byte
header http.Header
}

var prometheusClient *http.Client
Expand Down Expand Up @@ -71,21 +73,22 @@ func getPrometheusMetrics(config *config.SourceConfig) (*PrometheusResponse, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body - %v", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed - %q, response: %q", resp.Status, string(body))
}
return &PrometheusResponse{rawResponse: string(body)}, nil
return &PrometheusResponse{rawResponse: body, header: resp.Header}, nil
}

func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Response, err error) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
request.Header.Set("Accept", string(expfmt.FmtProtoDelim))
if len(auth.Username) > 0 {
request.SetBasicAuth(auth.Username, auth.Password)
} else if len(auth.Token) > 0 {
Expand All @@ -96,11 +99,24 @@ func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Respons

// Build performs parsing and processing of the prometheus metrics response.
func (p *PrometheusResponse) Build(config *config.CommonConfig, metricDescriptorCache *MetricDescriptorCache) (map[string]*dto.MetricFamily, error) {
parser := &expfmt.TextParser{}
metrics, err := parser.TextToMetricFamilies(strings.NewReader(p.rawResponse))
if err != nil {
return nil, err
format := expfmt.ResponseFormat(p.header)
if format == expfmt.FmtUnknown {
return nil, fmt.Errorf("failed to parse format from header: %s", p.header.Get("Content-Type"))
}
decoder := expfmt.NewDecoder(bytes.NewReader(p.rawResponse), format)
metrics := make(map[string]*dto.MetricFamily)
for {
metric := &dto.MetricFamily{}
err := decoder.Decode(metric)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
metrics[metric.GetName()] = metric
}

if config.OmitComponentName {
metrics = OmitComponentName(metrics, config.SourceConfig.Component)
}
Expand Down
7 changes: 7 additions & 0 deletions prometheus-to-sd/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,13 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution {

prevVal := uint64(0)
lower := float64(0)
infSeen := false
for _, b := range h.Bucket {
upper := b.GetUpperBound()
if !math.IsInf(b.GetUpperBound(), 1) {
bounds = append(bounds, b.GetUpperBound())
} else {
infSeen = true
upper = lower
}
val := b.GetCumulativeCount() - prevVal
Expand All @@ -375,6 +377,11 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution {
prevVal = b.GetCumulativeCount()
}

// +Inf Bucket is implicit so it needs to be added
if !infSeen && count > int64(prevVal) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this is addressing a different issue, could you add some background in the PR description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a different issue because the +inf bucket is omitted in protobuf encoding but text encoding handles that specially (see https://github.com/prometheus/common/blob/main/expfmt/text_create.go#L232 if interested). However, it makes sense to separate the changes with description, for which I split the changes into 2 commits to make it clear.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM-)

values = append(values, count-int64(prevVal))
}

return &v3.Distribution{
Count: count,
Mean: mean,
Expand Down
28 changes: 13 additions & 15 deletions prometheus-to-sd/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package translator

import (
"math"
"net/http"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -82,7 +83,7 @@ var testLabelValue2 = "labelValue2"

var now = time.Now()

var metricsResponse = &PrometheusResponse{rawResponse: `
var metricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 106.0
Expand All @@ -105,8 +106,7 @@ test_histogram_sum 13.0
test_histogram_count 5
# TYPE untyped_metric untyped
untyped_metric 98.6
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}

var metrics = map[string]*dto.MetricFamily{
testMetricName: {
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestTranslatePrometheusToStackdriverWithLabelFiltering(t *testing.T) {
}

func TestTranslateSummary(t *testing.T) {
var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE int_summary_metric summary
Expand All @@ -713,8 +713,8 @@ int_summary_metric{quantile="0.9"} 8
int_summary_metric{quantile="0.99"} 8
int_summary_metric_sum 42
int_summary_metric_count 101010
`}
var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE float_summary_metric summary
Expand All @@ -723,8 +723,8 @@ float_summary_metric{quantile="0.9"} 8.123
float_summary_metric{quantile="0.99"} 8.123
float_summary_metric_sum 0.42
float_summary_metric_count 50
`}
var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE int_summary_metric summary
Expand All @@ -738,7 +738,7 @@ int_summary_metric_sum{label="l1"} 7
int_summary_metric_sum{label="l2"} 8
int_summary_metric_count{label="l1"} 9
int_summary_metric_count{label="l2"} 10
`}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}

type summaryTest struct {
description string
Expand Down Expand Up @@ -928,25 +928,23 @@ func createDoublePoint(d float64, start time.Time, end time.Time) *v3.Point {

func TestUpdateScrapes(t *testing.T) {
tsb := NewTimeSeriesBuilder(CommonConfigWithMetrics([]string{testMetricName, floatMetricName}), buildCacheForTesting())
scrape := &PrometheusResponse{rawResponse: `
scrape := &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 106.0
# TYPE float_metric counter
float_metric 123.17
# TYPE test_name counter
process_start_time_seconds 1234567890.0
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
tsb.Update(scrape, now)
scrape = &PrometheusResponse{rawResponse: `
scrape = &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 601.0
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890.0
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
tsb.Update(scrape, now)
ts, timestamp, err := tsb.Build()
assert.Equal(t, timestamp, now)
Expand Down
Loading