Skip to content

Commit

Permalink
Merge pull request #739 from linxiulei/pb
Browse files Browse the repository at this point in the history
prom-to-sd: Prefer proto in content type
  • Loading branch information
CatherineF-dev authored Jul 17, 2024
2 parents e23f184 + f1fb117 commit 1324844
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
32 changes: 24 additions & 8 deletions prometheus-to-sd/translator/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package translator

import (
"bytes"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
"net/http"
"strings"
"time"
Expand All @@ -34,7 +35,8 @@ const customMetricsPrefix = "custom.googleapis.com"

// PrometheusResponse represents unprocessed response from Prometheus endpoint.
type PrometheusResponse struct {
rawResponse string
rawResponse []byte
header http.Header
}

var prometheusClient *http.Client
Expand Down Expand Up @@ -71,21 +73,22 @@ func getPrometheusMetrics(config *config.SourceConfig) (*PrometheusResponse, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body - %v", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed - %q, response: %q", resp.Status, string(body))
}
return &PrometheusResponse{rawResponse: string(body)}, nil
return &PrometheusResponse{rawResponse: body, header: resp.Header}, nil
}

func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Response, err error) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
request.Header.Set("Accept", string(expfmt.FmtProtoDelim))
if len(auth.Username) > 0 {
request.SetBasicAuth(auth.Username, auth.Password)
} else if len(auth.Token) > 0 {
Expand All @@ -96,11 +99,24 @@ func doPrometheusRequest(url string, auth config.AuthConfig) (resp *http.Respons

// Build performs parsing and processing of the prometheus metrics response.
func (p *PrometheusResponse) Build(config *config.CommonConfig, metricDescriptorCache *MetricDescriptorCache) (map[string]*dto.MetricFamily, error) {
parser := &expfmt.TextParser{}
metrics, err := parser.TextToMetricFamilies(strings.NewReader(p.rawResponse))
if err != nil {
return nil, err
format := expfmt.ResponseFormat(p.header)
if format == expfmt.FmtUnknown {
return nil, fmt.Errorf("failed to parse format from header: %s", p.header.Get("Content-Type"))
}
decoder := expfmt.NewDecoder(bytes.NewReader(p.rawResponse), format)
metrics := make(map[string]*dto.MetricFamily)
for {
metric := &dto.MetricFamily{}
err := decoder.Decode(metric)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
metrics[metric.GetName()] = metric
}

if config.OmitComponentName {
metrics = OmitComponentName(metrics, config.SourceConfig.Component)
}
Expand Down
7 changes: 7 additions & 0 deletions prometheus-to-sd/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,13 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution {

prevVal := uint64(0)
lower := float64(0)
infSeen := false
for _, b := range h.Bucket {
upper := b.GetUpperBound()
if !math.IsInf(b.GetUpperBound(), 1) {
bounds = append(bounds, b.GetUpperBound())
} else {
infSeen = true
upper = lower
}
val := b.GetCumulativeCount() - prevVal
Expand All @@ -375,6 +377,11 @@ func convertToDistributionValue(h *dto.Histogram) *v3.Distribution {
prevVal = b.GetCumulativeCount()
}

// +Inf Bucket is implicit so it needs to be added
if !infSeen && count > int64(prevVal) {
values = append(values, count-int64(prevVal))
}

return &v3.Distribution{
Count: count,
Mean: mean,
Expand Down
28 changes: 13 additions & 15 deletions prometheus-to-sd/translator/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package translator

import (
"math"
"net/http"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -82,7 +83,7 @@ var testLabelValue2 = "labelValue2"

var now = time.Now()

var metricsResponse = &PrometheusResponse{rawResponse: `
var metricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 106.0
Expand All @@ -105,8 +106,7 @@ test_histogram_sum 13.0
test_histogram_count 5
# TYPE untyped_metric untyped
untyped_metric 98.6
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}

var metrics = map[string]*dto.MetricFamily{
testMetricName: {
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestTranslatePrometheusToStackdriverWithLabelFiltering(t *testing.T) {
}

func TestTranslateSummary(t *testing.T) {
var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
var intSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE int_summary_metric summary
Expand All @@ -713,8 +713,8 @@ int_summary_metric{quantile="0.9"} 8
int_summary_metric{quantile="0.99"} 8
int_summary_metric_sum 42
int_summary_metric_count 101010
`}
var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
var floatSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE float_summary_metric summary
Expand All @@ -723,8 +723,8 @@ float_summary_metric{quantile="0.9"} 8.123
float_summary_metric{quantile="0.99"} 8.123
float_summary_metric_sum 0.42
float_summary_metric_count 50
`}
var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: `
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
var labelIntSummaryMetricsResponse = &PrometheusResponse{rawResponse: []byte(`
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890
# TYPE int_summary_metric summary
Expand All @@ -738,7 +738,7 @@ int_summary_metric_sum{label="l1"} 7
int_summary_metric_sum{label="l2"} 8
int_summary_metric_count{label="l1"} 9
int_summary_metric_count{label="l2"} 10
`}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}

type summaryTest struct {
description string
Expand Down Expand Up @@ -928,25 +928,23 @@ func createDoublePoint(d float64, start time.Time, end time.Time) *v3.Point {

func TestUpdateScrapes(t *testing.T) {
tsb := NewTimeSeriesBuilder(CommonConfigWithMetrics([]string{testMetricName, floatMetricName}), buildCacheForTesting())
scrape := &PrometheusResponse{rawResponse: `
scrape := &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 106.0
# TYPE float_metric counter
float_metric 123.17
# TYPE test_name counter
process_start_time_seconds 1234567890.0
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
tsb.Update(scrape, now)
scrape = &PrometheusResponse{rawResponse: `
scrape = &PrometheusResponse{rawResponse: []byte(`
# TYPE test_name counter
test_name{labelName="labelValue1"} 42.0
test_name{labelName="labelValue2"} 601.0
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1234567890.0
`,
}
`), header: http.Header{"Content-Type": []string{"text/plain; version=0.0.4; charset=UTF-8"}}}
tsb.Update(scrape, now)
ts, timestamp, err := tsb.Build()
assert.Equal(t, timestamp, now)
Expand Down

0 comments on commit 1324844

Please sign in to comment.