diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index b4448aa633..3679f4bf9f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -67,6 +67,11 @@ func DefaultPrometheusImage() string { return "quay.io/prometheus/prometheus:v2.41.0" } +// DefaultOtelImage sets default Otel image used in e2e service. +func DefaultOtelImage() string { + return "otel/opentelemetry-collector-contrib:0.98.0" +} + // DefaultAlertmanagerImage sets default Alertmanager image used in e2e service. func DefaultAlertmanagerImage() string { return "quay.io/prometheus/alertmanager:v0.20.0" @@ -130,6 +135,30 @@ func NewPrometheus(e e2e.Environment, name, promConfig, webConfig, promImage str })), "http") } +func NewOtel(e e2e.Environment, name, otelConfig, otelImage string) *e2eobs.Observable { + f := e.Runnable(name).WithPorts(map[string]int{"http": 9090}).Future() + if err := os.MkdirAll(f.Dir(), 0750); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create otel dir"))} + } + + if err := os.WriteFile(filepath.Join(f.Dir(), "otel.yaml"), []byte(otelConfig), 0600); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating otel config"))} + } + + //probe := e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200) + args := e2e.BuildArgs(map[string]string{ + "--config": filepath.Join(f.InternalDir(), "otel.yaml"), + //"--log.level": infoLogLevel, + //"--web.listen-address": ":9090", + }) + + return e2eobs.AsObservable(f.Init(wrapWithDefaults(e2e.StartOptions{ + Image: otelImage, + Command: e2e.NewCommandWithoutEntrypoint("/otelcol-contrib", args...), + //Readiness: probe, + })), "http") +} + func NewPrometheusWithSidecar(e e2e.Environment, name, promConfig, webConfig, promImage, minTime string, enableFeatures ...string) (*e2eobs.Observable, *e2eobs.Observable) { return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, minTime, DefaultImage(), enableFeatures...) } @@ -523,6 +552,8 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { return args, nil } +func OTLPEndpoint(addr string) string { return fmt.Sprintf("http://%s", addr) } + func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) } func RemoteWriteEndpoints(addrs ...string) string { @@ -1250,6 +1281,53 @@ rule_files: return config } +// DefaultOtelConfig returns Otel config that sets Otel to: +// * expose 2 external labels, source and replica. +// * optionally scrape self. This will produce up == 0 metric which we can assert on. +// * optionally remote write endpoint to write into. +func DefaultOtelConfig(remoteWriteEndpoint string) string { + config := fmt.Sprintf(` +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + prometheus: + config: + scrape_configs: + - job_name: otel-collector + scrape_interval: 5s + static_configs: + - targets: [localhost:8888] +exporters: + otlphttp/thanos: + endpoint: "%s" + tls: + insecure: true + debug: + verbosity: detailed +extensions: + health_check: + pprof: +service: + telemetry: + logs: + level: "debug" + extensions: [pprof, health_check] + pipelines: + metrics: + receivers: + - prometheus + - otlp + exporters: + - otlphttp/thanos +`, remoteWriteEndpoint) + + return config +} + func NewRedis(e e2e.Environment, name string) e2e.Runnable { return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init( e2e.StartOptions{ diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 7d841e9a7e..0db8e21339 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -109,6 +109,58 @@ func TestReceive(t *testing.T) { }) }) + t.Run("ingestor_otlp", func(t *testing.T) { + /* + The single_ingestor suite represents the simplest possible configuration of Thanos Receive. + ┌──────────┐ + │ Prom │ + └────┬─────┘ + │ + ┌────▼─────┐ + │ Ingestor │ + └────┬─────┘ + │ + ┌────▼─────┐ + │ Query │ + └──────────┘ + NB: Made with asciiflow.com - you can copy & paste the above there to modify. + */ + + t.Parallel() + e, err := e2e.NewDockerEnvironment("ingestor-otlp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // Setup Router Ingestor. + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + // Setup Otel + otel := e2ethanos.NewOtel(e, "1", e2ethanos.DefaultOtelConfig(e2ethanos.OTLPEndpoint(i.InternalEndpoint("remote-write"))), e2ethanos.DefaultOtelImage()) + testutil.Ok(t, e2e.StartAndWaitReady(otel)) + + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + println("test") + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"otelcol_process_uptime_total"}, e2emon.WaitMissingMetrics())) + + // We expect the data from each Prometheus instance to be replicated twice across our ingesting instances + queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "otel-collector", + "receive": "receive-ingestor", + "service_name": "otelcol-contrib", + "tenant_id": "default-tenant", + }, + }) + }) + t.Run("ha_ingestor_with_ha_prom", func(t *testing.T) { /* The ha_ingestor_with_ha_prom suite represents a configuration of a