Skip to content

Commit

Permalink
Change otlp attribute conversion to be consistent with prometheus (#6272
Browse files Browse the repository at this point in the history
)

Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 authored Nov 5, 2024
1 parent 3267515 commit 2e5488a
Show file tree
Hide file tree
Showing 10 changed files with 430 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master / unreleased

* [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272
- `target_info` metric is enabled by default and can be disabled via `-distributor.otlp.disable-target-info=true` flag
- Convert all attributes to labels is disabled by default and can be enabled via `-distributor.otlp.convert-all-attributes=true` flag
- You can specify the attributes converted to labels via `-distributor.promote-resource-attributes` flag. Supported only if `-distributor.otlp.convert-all-attributes=false`
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2674,6 +2674,16 @@ instance_limits:
# unlimited.
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
otlp:
# If true, all resource attributes are converted to labels.
# CLI flag: -distributor.otlp.convert-all-attributes
[convert_all_attributes: <boolean> | default = false]
# If true, a target_info metric is not ingested. (refer to:
# https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
# CLI flag: -distributor.otlp.disable-target-info
[disable_target_info: <boolean> | default = false]
```

### `etcd_config`
Expand Down Expand Up @@ -3319,6 +3329,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]
# Comma separated list of resource attributes that should be converted to
# labels.
# CLI flag: -distributor.promote-resource-attributes
[promote_resource_attributes: <list of string> | default = ]
# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
[max_series_per_user: <int> | default = 5000000]
Expand Down
10 changes: 6 additions & 4 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics
return metrics
}

func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand All @@ -244,6 +244,9 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")
for _, label := range labels {
resourceMetric.Resource().Attributes().PutStr(label.Name, label.Value)
}

scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

Expand All @@ -258,7 +261,6 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterDataPoint.SetDoubleValue(10.0)
counterDataPoint.Attributes().PutStr("foo.bar", "baz")

counterExemplar := counterDataPoint.Exemplars().AppendEmpty()
counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
Expand All @@ -269,8 +271,8 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
return pmetricotlp.NewExportRequestFromMetrics(d)
}

func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) {
data, err := otlpWriteRequest(name).MarshalProto()
func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, labels...).MarshalProto()
if err != nil {
return nil, err
}
Expand Down
110 changes: 110 additions & 0 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package integration

import (
"bytes"
"context"
"fmt"
"math/rand"
"path/filepath"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/s3"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
Expand Down Expand Up @@ -144,3 +147,110 @@ func TestOTLPIngestExemplar(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(exemplars))
}

func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
configFileName := "runtime-config.yaml"

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-auth.enabled": "true",
"-runtime-config.backend": "s3",
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
"-runtime-config.s3.bucket-name": bucketName,
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-runtime-config.s3.insecure": "true",
"-runtime-config.file": configFileName,
"-runtime-config.reload-period": "1s",

// Distributor
"-distributor.otlp.convert-all-attributes": "false",
"-distributor.promote-resource-attributes": "attr1,attr2,attr3",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
})

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

client, err := s3.NewBucketWithConfig(nil, s3.Config{
Endpoint: minio.HTTPEndpoint(),
Insecure: true,
Bucket: bucketName,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
}, "runtime-config-test", nil)

require.NoError(t, err)

// update runtime config
newRuntimeConfig := []byte(`overrides:
user-1:
promote_resource_attributes: ["attr1"]
user-2:
promote_resource_attributes: ["attr1", "attr2"]
`)
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
time.Sleep(2 * time.Second)

require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))

// start cortex and assert runtime-config is loaded correctly
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2")
require.NoError(t, err)

c3, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-3")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()

labels := []prompb.Label{
{Name: "service.name", Value: "test-service"},
{Name: "attr1", Value: "value"},
{Name: "attr2", Value: "value"},
{Name: "attr3", Value: "value"},
}

res, err := c1.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c2.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c3.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

labelSet1, err := c1.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet1, []string{"__name__", "attr1", "instance", "job"})

labelSet2, err := c2.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet2, []string{"__name__", "attr1", "attr2", "instance", "job"})

labelSet3, err := c3.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
}
5 changes: 3 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/push"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface.
Expand Down Expand Up @@ -273,11 +274,11 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
}

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides)

return nil, nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,21 @@ type Config struct {

// Limits for distributor
InstanceLimits InstanceLimits `yaml:"instance_limits"`

// OTLPConfig
OTLPConfig OTLPConfig `yaml:"otlp"`
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.PoolConfig.RegisterFlags(f)
Expand All @@ -188,6 +196,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")

f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
}

// Validate config and returns error on failure
Expand Down
72 changes: 52 additions & 20 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,50 @@
package push

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/annotations"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
logger := util_log.WithContext(ctx, util_log.Logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
logger = log.WithSourceIPs(source, logger)
logger = util_log.WithSourceIPs(source, logger)
}
}
req, err := remote.DecodeOTLPWriteRequest(r)

userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

promConverter := prometheusremotewrite.NewPrometheusConverter()
setting := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: true,
}
annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting)
ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}

req, err := remote.DecodeOTLPWriteRequest(r)
if err != nil {
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -60,8 +55,16 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
SkipLabelNameValidation: false,
}

// otlp to prompb TimeSeries
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// convert prompb to cortexpb TimeSeries
tsList := []cortexpb.PreallocTimeseries(nil)
for _, v := range promConverter.TimeSeries() {
for _, v := range promTsList {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Samples: makeSamples(v.Samples),
Expand All @@ -87,6 +90,35 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
})
}

func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
}

var annots annotations.Annotations
var err error

if cfg.ConvertAllAttributes {
annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings)
} else {
settings.PromoteResourceAttributes = overrides.PromoteResourceAttributes(userID)
annots, err = promConverter.FromMetrics(ctx, pmetrics, settings)
}

ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}

if err != nil {
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
return nil, err
}
return promConverter.TimeSeries(), nil
}

func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
out := make(labels.Labels, 0, len(in))
for _, l := range in {
Expand Down
Loading

0 comments on commit 2e5488a

Please sign in to comment.