Skip to content

Commit

Permalink
[FEATURE] adding otlp endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <[email protected]>
  • Loading branch information
nicolastakashi committed Dec 16, 2024
1 parent 4415626 commit 58e4bc2
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
78 changes: 78 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func DefaultPrometheusImage() string {
return "quay.io/prometheus/prometheus:v2.41.0"
}

// DefaultOtelImage sets default Otel image used in e2e service.
func DefaultOtelImage() string {
return "otel/opentelemetry-collector-contrib:0.98.0"
}

// DefaultAlertmanagerImage sets default Alertmanager image used in e2e service.
func DefaultAlertmanagerImage() string {
return "quay.io/prometheus/alertmanager:v0.20.0"
Expand Down Expand Up @@ -130,6 +135,30 @@ func NewPrometheus(e e2e.Environment, name, promConfig, webConfig, promImage str
})), "http")
}

func NewOtel(e e2e.Environment, name, otelConfig, otelImage string) *e2eobs.Observable {
f := e.Runnable(name).WithPorts(map[string]int{"http": 9090}).Future()
if err := os.MkdirAll(f.Dir(), 0750); err != nil {
return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create otel dir"))}
}

if err := os.WriteFile(filepath.Join(f.Dir(), "otel.yaml"), []byte(otelConfig), 0600); err != nil {
return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "creating otel config"))}
}

//probe := e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200)
args := e2e.BuildArgs(map[string]string{
"--config": filepath.Join(f.InternalDir(), "otel.yaml"),
//"--log.level": infoLogLevel,
//"--web.listen-address": ":9090",
})

return e2eobs.AsObservable(f.Init(wrapWithDefaults(e2e.StartOptions{
Image: otelImage,
Command: e2e.NewCommandWithoutEntrypoint("/otelcol-contrib", args...),
//Readiness: probe,
})), "http")
}

func NewPrometheusWithSidecar(e e2e.Environment, name, promConfig, webConfig, promImage, minTime string, enableFeatures ...string) (*e2eobs.Observable, *e2eobs.Observable) {
return NewPrometheusWithSidecarCustomImage(e, name, promConfig, webConfig, promImage, minTime, DefaultImage(), enableFeatures...)
}
Expand Down Expand Up @@ -523,6 +552,8 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
return args, nil
}

func OTLPEndpoint(addr string) string { return fmt.Sprintf("http://%s", addr) }

func RemoteWriteEndpoint(addr string) string { return fmt.Sprintf("http://%s/api/v1/receive", addr) }

func RemoteWriteEndpoints(addrs ...string) string {
Expand Down Expand Up @@ -1250,6 +1281,53 @@ rule_files:
return config
}

// DefaultOtelConfig returns Otel config that sets Otel to:
// * expose 2 external labels, source and replica.
// * optionally scrape self. This will produce up == 0 metric which we can assert on.
// * optionally remote write endpoint to write into.
func DefaultOtelConfig(remoteWriteEndpoint string) string {
config := fmt.Sprintf(`
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
prometheus:
config:
scrape_configs:
- job_name: otel-collector
scrape_interval: 5s
static_configs:
- targets: [localhost:8888]
exporters:
otlphttp/thanos:
endpoint: "%s"
tls:
insecure: true
debug:
verbosity: detailed
extensions:
health_check:
pprof:
service:
telemetry:
logs:
level: "debug"
extensions: [pprof, health_check]
pipelines:
metrics:
receivers:
- prometheus
- otlp
exporters:
- otlphttp/thanos
`, remoteWriteEndpoint)

return config
}

func NewRedis(e e2e.Environment, name string) e2e.Runnable {
return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init(
e2e.StartOptions{
Expand Down
52 changes: 52 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,58 @@ func TestReceive(t *testing.T) {
})
})

t.Run("ingestor_otlp", func(t *testing.T) {
/*
The single_ingestor suite represents the simplest possible configuration of Thanos Receive.
┌──────────┐
│ Prom │
└────┬─────┘
┌────▼─────┐
│ Ingestor │
└────┬─────┘
┌────▼─────┐
│ Query │
└──────────┘
NB: Made with asciiflow.com - you can copy & paste the above there to modify.
*/

t.Parallel()
e, err := e2e.NewDockerEnvironment("ingestor-otlp")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// Setup Router Ingestor.
i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

// Setup Otel
otel := e2ethanos.NewOtel(e, "1", e2ethanos.DefaultOtelConfig(e2ethanos.OTLPEndpoint(i.InternalEndpoint("remote-write"))), e2ethanos.DefaultOtelImage())
testutil.Ok(t, e2e.StartAndWaitReady(otel))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

println("test")
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2emon.Equals(1), []string{"otelcol_process_uptime_total"}, e2emon.WaitMissingMetrics()))

// We expect the data from each Prometheus instance to be replicated twice across our ingesting instances
queryAndAssertSeries(t, ctx, q.Endpoint("http"), e2ethanos.QueryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"job": "otel-collector",
"receive": "receive-ingestor",
"service_name": "otelcol-contrib",
"tenant_id": "default-tenant",
},
})
})

t.Run("ha_ingestor_with_ha_prom", func(t *testing.T) {
/*
The ha_ingestor_with_ha_prom suite represents a configuration of a
Expand Down

0 comments on commit 58e4bc2

Please sign in to comment.