From ff782ca9f97ad54303f417a248e89f6e38910b58 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sat, 28 Sep 2024 07:34:03 +0900 Subject: [PATCH] Add otlp exemplar ingestion test (#6244) --- integration/e2ecortex/client.go | 74 +++++++++++++++++++++++++++++++++ integration/otlp_test.go | 46 ++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index b6ea5fb7f9..4ff4fa506a 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -231,6 +231,73 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics return metrics } +func otlpWriteRequest(name string) pmetricotlp.ExportRequest { + d := pmetric.NewMetrics() + + // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram + // with resource attributes: service.name="test-service", service.instance.id="test-instance", host.name="test-host" + // with metric attibute: foo.bar="baz" + + timestamp := time.Now() + + resourceMetric := d.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") + resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance") + resourceMetric.Resource().Attributes().PutStr("host.name", "test-host") + + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + // Generate One Counter + counterMetric := scopeMetric.Metrics().AppendEmpty() + counterMetric.SetName(name) + counterMetric.SetDescription("test-counter-description") + + counterMetric.SetEmptySum() + counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() + counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterDataPoint.SetDoubleValue(10.0) + counterDataPoint.Attributes().PutStr("foo.bar", "baz") + + counterExemplar := counterDataPoint.Exemplars().AppendEmpty() + counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterExemplar.SetDoubleValue(10.0) + counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7}) + counterExemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}) + + return pmetricotlp.NewExportRequestFromMetrics(d) +} + +func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) { + data, err := otlpWriteRequest(name).MarshalProto() + if err != nil { + return nil, err + } + + // Create HTTP request + req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data)) + if err != nil { + return nil, err + } + + req.Header.Set("X-Scope-OrgID", c.orgID) + req.Header.Set("Content-Type", "application/x-protobuf") + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + // Execute HTTP request + res, err := c.httpClient.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + defer res.Body.Close() + + return res, nil +} + // Push series to OTLP endpoint func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) { @@ -267,6 +334,13 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) { return value, err } +// QueryExemplars runs an exemplars query +func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + return c.querierClient.QueryExemplars(ctx, query, start, end) +} + // QueryRange runs a query range. func (c *Client) QueryRange(query string, start, end time.Time, step time.Duration) (model.Value, error) { value, _, err := c.querierClient.QueryRange(context.Background(), query, promv1.Range{ diff --git a/integration/otlp_test.go b/integration/otlp_test.go index da689e7b67..463d4d24d5 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -98,3 +98,49 @@ func TestOTLP(t *testing.T) { require.Equal(t, float64(expectedHistogram.Count), float64(v[0].Histogram.Count)) require.Equal(t, expectedHistogram.Sum, float64(v[0].Histogram.Sum)) } + +func TestOTLPIngestExemplar(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start Cortex components. + require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile)) + + // Start Cortex in single binary mode, reading the config from file and overwriting + // the backend config to make it work with Minio. + flags := map[string]string{ + "-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey, + "-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey, + "-blocks-storage.s3.bucket-name": bucketName, + "-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-blocks-storage.s3.insecure": "true", + "-blocks-storage.tsdb.enable-native-histograms": "true", + "-ingester.max-exemplars": "100", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-alertmanager-storage.backend": "local", + "-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), + } + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095) + require.NoError(t, s.StartAndWaitReady(cortex)) + + c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + res, err := c.OTLPPushExemplar("exemplar_1") + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + now := time.Now() + exemplars, err := c.QueryExemplars("exemplar_1", now.Add(-time.Minute), now.Add(time.Minute)) + require.NoError(t, err) + require.Equal(t, 1, len(exemplars)) +}