diff --git a/go.mod b/go.mod index 39e37512d7..0edd410250 100644 --- a/go.mod +++ b/go.mod @@ -120,6 +120,7 @@ require ( github.com/onsi/gomega v1.34.0 github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 github.com/seiflotfy/cuckoofilter v0.0.0-20240715131351-a2f2c23f1771 + go.opentelemetry.io/collector/pdata v1.14.1 go.opentelemetry.io/contrib/propagators/autoprop v0.54.0 go4.org/intern v0.0.0-20230525184215-6c62f75575cb golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 @@ -154,7 +155,6 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/sercand/kuberesolver/v4 v4.0.0 // indirect github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 // indirect - go.opentelemetry.io/collector/pdata v1.14.1 // indirect go.opentelemetry.io/collector/semconv v0.108.1 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/propagators/ot v1.29.0 // indirect diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 163a29a2e1..96b3e9de0a 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -275,6 +275,18 @@ func NewHandler(logger log.Logger, o *Options) *Handler { ), ) + h.router.Post( + "/v1/metrics", + instrf( + "otlp", + readyf( + middleware.RequestID( + http.HandlerFunc(h.receiveOTLPHTTP), + ), + ), + ), + ) + statusAPI := statusapi.New(statusapi.Options{ GetStats: h.getStats, Registry: h.options.Registry, diff --git a/pkg/receive/handler_otlp.go b/pkg/receive/handler_otlp.go new file mode 100644 index 0000000000..ab4ae122f4 --- /dev/null +++ b/pkg/receive/handler_otlp.go @@ -0,0 +1,214 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "net/http" + "strconv" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + "github.com/thanos-io/thanos/pkg/store/labelpb" + tprompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "github.com/thanos-io/thanos/pkg/tenancy" + "github.com/thanos-io/thanos/pkg/tracing" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func (h *Handler) receiveOTLPHTTP(w http.ResponseWriter, r *http.Request) { + var err error + span, ctx := tracing.StartSpan(r.Context(), "receiveOTLPHTTP") + span.SetTag("receiver.mode", string(h.receiverMode)) + defer span.Finish() + + tenant, err := tenancy.GetTenantFromHTTP(r, h.options.TenantHeader, h.options.DefaultTenantID, h.options.TenantField) + if err != nil { + level.Error(h.logger).Log("msg", "error getting tenant from HTTP", "err", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + tLogger := log.With(h.logger, "tenant", tenant) + span.SetTag("tenant", tenant) + + writeGate := h.Limiter.WriteGate() + tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { + err = writeGate.Start(r.Context()) + }) + + defer writeGate.Done() + if err != nil { + level.Error(tLogger).Log("err", err, "msg", "internal server error") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + under, err := h.Limiter.HeadSeriesLimiter().isUnderLimit(tenant) + if err != nil { + level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) + } + + // Fail request fully if tenant has exceeded set limit. + if !under { + http.Error(w, "tenant is above active series limit", http.StatusTooManyRequests) + return + } + + requestLimiter := h.Limiter.RequestLimiter() + req, err := remote.DecodeOTLPWriteRequest(r) + if err != nil { + level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + promTimeSeries, err := h.convertToPrometheusFormat(ctx, req.Metrics()) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + prwMetrics := make([]tprompb.TimeSeries, 0, len(promTimeSeries)) + totalSamples := 0 + var tpromTs tprompb.TimeSeries + + for _, ts := range promTimeSeries { + tpromTs = tprompb.TimeSeries{ + Labels: makeLabels(ts.Labels), + Samples: makeSamples(ts.Samples), + Exemplars: makeExemplars(ts.Exemplars), + } + totalSamples += len(ts.Samples) + prwMetrics = append(prwMetrics, tpromTs) + } + + if !requestLimiter.AllowSeries(tenant, int64(len(prwMetrics))) { + http.Error(w, "too many timeseries", http.StatusRequestEntityTooLarge) + return + } + + if !requestLimiter.AllowSamples(tenant, int64(totalSamples)) { + http.Error(w, "too many samples", http.StatusRequestEntityTooLarge) + return + } + + rep := uint64(0) + // If the header is empty, we assume the request is not yet replicated. + if replicaRaw := r.Header.Get(h.options.ReplicaHeader); replicaRaw != "" { + if rep, err = strconv.ParseUint(replicaRaw, 10, 64); err != nil { + http.Error(w, "could not parse replica header", http.StatusBadRequest) + return + } + } + + wreq := tprompb.WriteRequest{ + Timeseries: prwMetrics, + // TODO Handle metadata, requires thanos receiver support ingesting metadata + //Metadata: otlptranslator.OtelMetricsToMetadata(), + } + + // Exit early if the request contained no data. We don't support metadata yet. We also cannot fail here, because + // this would mean lack of forward compatibility for remote write proto. + if len(wreq.Timeseries) == 0 { + // TODO(yeya24): Handle remote write metadata. + if len(wreq.Metadata) > 0 { + // TODO(bwplotka): Do we need this error message? + level.Debug(tLogger).Log("msg", "only metadata from client; metadata ingestion not supported; skipping") + return + } + level.Debug(tLogger).Log("msg", "empty remote write request; client bug or newer remote write protocol used?; skipping") + return + } + + // Apply relabeling configs. + h.relabel(&wreq) + if len(wreq.Timeseries) == 0 { + level.Debug(tLogger).Log("msg", "remote write request dropped due to relabeling.") + return + } + + responseStatusCode := http.StatusOK + tenantStats, err := h.handleRequest(ctx, rep, tenant, &wreq) + if err != nil { + level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) + switch errors.Cause(err) { + case errNotReady: + responseStatusCode = http.StatusServiceUnavailable + case errUnavailable: + responseStatusCode = http.StatusServiceUnavailable + case errConflict: + responseStatusCode = http.StatusConflict + case errBadReplica: + responseStatusCode = http.StatusBadRequest + default: + level.Error(tLogger).Log("err", err, "msg", "internal server error") + responseStatusCode = http.StatusInternalServerError + } + http.Error(w, err.Error(), responseStatusCode) + } + + for tenant, stats := range tenantStats { + h.writeTimeseriesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.timeseries)) + h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(stats.totalSamples)) + } + +} + +func (h *Handler) convertToPrometheusFormat(ctx context.Context, pmetrics pmetric.Metrics) ([]prompb.TimeSeries, error) { + promConverter := prometheusremotewrite.NewPrometheusConverter() + settings := prometheusremotewrite.Settings{ + AddMetricSuffixes: true, + DisableTargetInfo: true, // this must to be configured + PromoteResourceAttributes: []string{"service.name", "service.namespace"}, // this must to be configured + } + + annots, err := promConverter.FromMetrics(ctx, pmetrics, settings) + ws, _ := annots.AsStrings("", 0, 0) + if len(ws) > 0 { + level.Warn(h.logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) + } + + if err != nil { + level.Error(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) + return nil, err + } + + return promConverter.TimeSeries(), nil +} + +func makeLabels(in []prompb.Label) []labelpb.ZLabel { + out := make([]labelpb.ZLabel, 0, len(in)) + for _, l := range in { + out = append(out, labelpb.ZLabel{Name: l.Name, Value: l.Value}) + } + return out +} + +func makeSamples(in []prompb.Sample) []tprompb.Sample { + out := make([]tprompb.Sample, 0, len(in)) + for _, s := range in { + out = append(out, tprompb.Sample{ + Value: s.Value, + Timestamp: s.Timestamp, + }) + } + return out +} + +func makeExemplars(in []prompb.Exemplar) []tprompb.Exemplar { + out := make([]tprompb.Exemplar, 0, len(in)) + for _, e := range in { + out = append(out, tprompb.Exemplar{ + Labels: makeLabels(e.Labels), + Value: e.Value, + Timestamp: e.Timestamp, + }) + } + return out +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b4448aa633..3679f4bf9f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -67,6 +67,11 @@ func DefaultPrometheusImage() string { return "quay.io/prometheus/prometheus:v2.41.0" } +// DefaultOtelImage sets default Otel image used in e2e service. +func DefaultOtelImage() string { + return "otel/opentelemetry-collector-contrib:0.98.0" +} + // DefaultAlertmanagerImage sets default Alertmanager image used in e2e service. func DefaultAlertmanagerImage() string { return "quay.io/prometheus/alertmanager:v0.20.0" @@ -130,6 +135,30 @@ func NewPrometheus(e e2e.Environment, name, promConfig, webConfig, promImage str })), "http") } +func NewOtel(e e2e.Environment, name, otelConfig, otelImage string) *e2eobs.Observable { + f := e.Runnable(name).WithPorts(map[string]int{"http": 9090}).Future() + if err := os.MkdirAll(f.Dir(), 0750); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create otel dir"))} + } + + if err := os.WriteFile(filepath.Join(f.Dir(), "otel.yaml"), []byte(otelConfig), 0600); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating otel config"))} + } + + //probe := e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200) + args := e2e.BuildArgs(map[string]string{ + "--config": filepath.Join(f.InternalDir(), "otel.yaml"), + //"--log.level": infoLogLevel, + //"--web.listen-address": ":9090", + }) + + return e2eobs.AsObservable(f.Init(wrapWithDefaults(e2e.StartOptions{ + Image: otelImage, + Command: e2e.NewCommandWithoutEntrypoint("/otelcol-contrib", args...), + //Readiness: probe, + })), "http") +} + func NewPrometheusWithSidecar(e e2e.Environment, name, promConfig, webConfig, promImage, minTime string, enableFeatures ...string) (*e2eobs.Observable, *e2eobs.Observable) { return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, minTime, DefaultImage(), enableFeatures...) } @@ -523,6 +552,8 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return args, nil } +func OTLPEndpoint(addr string) string { return fmt.Sprintf("http://%s", addr) } + func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) } func RemoteWriteEndpoints(addrs ...string) string { @@ -1250,6 +1281,53 @@ rule_files: return config } +// DefaultOtelConfig returns Otel config that sets Otel to: +// * expose 2 external labels, source and replica. +// * optionally scrape self. This will produce up == 0 metric which we can assert on. +// * optionally remote write endpoint to write into. +func DefaultOtelConfig(remoteWriteEndpoint string) string { + config := fmt.Sprintf(` +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + prometheus: + config: + scrape_configs: + - job_name: otel-collector + scrape_interval: 5s + static_configs: + - targets: [localhost:8888] +exporters: + otlphttp/thanos: + endpoint: "%s" + tls: + insecure: true + debug: + verbosity: detailed +extensions: + health_check: + pprof: +service: + telemetry: + logs: + level: "debug" + extensions: [pprof, health_check] + pipelines: + metrics: + receivers: + - prometheus + - otlp + exporters: + - otlphttp/thanos +`, remoteWriteEndpoint) + + return config +} + func NewRedis(e e2e.Environment, name string) e2e.Runnable { return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init( e2e.StartOptions{ diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 7d841e9a7e..0db8e21339 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -109,6 +109,58 @@ func TestReceive(t *testing.T) { }) }) + t.Run("ingestor_otlp", func(t *testing.T) { + /* + The single_ingestor suite represents the simplest possible configuration of Thanos Receive. + ┌──────────┐ + │ Prom │ + └────┬─────┘ + │ + ┌────▼─────┐ + │ Ingestor │ + └────┬─────┘ + │ + ┌────▼─────┐ + │ Query │ + └──────────┘ + NB: Made with asciiflow.com - you can copy & paste the above there to modify. + */ + + t.Parallel() + e, err := e2e.NewDockerEnvironment("ingestor-otlp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // Setup Router Ingestor. + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + // Setup Otel + otel := e2ethanos.NewOtel(e, "1", e2ethanos.DefaultOtelConfig(e2ethanos.OTLPEndpoint(i.InternalEndpoint("remote-write"))), e2ethanos.DefaultOtelImage()) + testutil.Ok(t, e2e.StartAndWaitReady(otel)) + + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + println("test") + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"otelcol_process_uptime_total"}, e2emon.WaitMissingMetrics())) + + // We expect the data from each Prometheus instance to be replicated twice across our ingesting instances + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "otel-collector", + "receive": "receive-ingestor", + "service_name": "otelcol-contrib", + "tenant_id": "default-tenant", + }, + }) + }) + t.Run("ha_ingestor_with_ha_prom", func(t *testing.T) { /* The ha_ingestor_with_ha_prom suite represents a configuration of a