Skip to content

Commit

Permalink
Add otlp exemplar ingestion test (#6244)
Browse files Browse the repository at this point in the history
  • Loading branch information
SungJin1212 authored Sep 27, 2024
1 parent fbe118b commit ff782ca
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
74 changes: 74 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,73 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics
return metrics
}

func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
// with resource attributes: service.name="test-service", service.instance.id="test-instance", host.name="test-host"
// with metric attibute: foo.bar="baz"

timestamp := time.Now()

resourceMetric := d.ResourceMetrics().AppendEmpty()
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")

scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

// Generate One Counter
counterMetric := scopeMetric.Metrics().AppendEmpty()
counterMetric.SetName(name)
counterMetric.SetDescription("test-counter-description")

counterMetric.SetEmptySum()
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterDataPoint.SetDoubleValue(10.0)
counterDataPoint.Attributes().PutStr("foo.bar", "baz")

counterExemplar := counterDataPoint.Exemplars().AppendEmpty()
counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterExemplar.SetDoubleValue(10.0)
counterExemplar.SetSpanID(pcommon.SpanID{0, 1, 2, 3, 4, 5, 6, 7})
counterExemplar.SetTraceID(pcommon.TraceID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15})

return pmetricotlp.NewExportRequestFromMetrics(d)
}

func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) {
data, err := otlpWriteRequest(name).MarshalProto()
if err != nil {
return nil, err
}

// Create HTTP request
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/otlp/v1/metrics", c.distributorAddress), bytes.NewReader(data))
if err != nil {
return nil, err
}

req.Header.Set("X-Scope-OrgID", c.orgID)
req.Header.Set("Content-Type", "application/x-protobuf")

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()

return res, nil
}

// Push series to OTLP endpoint
func (c *Client) OTLP(timeseries []prompb.TimeSeries) (*http.Response, error) {

Expand Down Expand Up @@ -267,6 +334,13 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

// QueryExemplars runs an exemplars query
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
return c.querierClient.QueryExemplars(ctx, query, start, end)
}

// QueryRange runs a query range.
func (c *Client) QueryRange(query string, start, end time.Time, step time.Duration) (model.Value, error) {
value, _, err := c.querierClient.QueryRange(context.Background(), query, promv1.Range{
Expand Down
46 changes: 46 additions & 0 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,49 @@ func TestOTLP(t *testing.T) {
require.Equal(t, float64(expectedHistogram.Count), float64(v[0].Histogram.Count))
require.Equal(t, expectedHistogram.Sum, float64(v[0].Histogram.Sum))
}

func TestOTLPIngestExemplar(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Start Cortex components.
require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks.yaml", cortexConfigFile))

// Start Cortex in single binary mode, reading the config from file and overwriting
// the backend config to make it work with Minio.
flags := map[string]string{
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
"-blocks-storage.s3.bucket-name": bucketName,
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-blocks-storage.s3.insecure": "true",
"-blocks-storage.tsdb.enable-native-histograms": "true",
"-ingester.max-exemplars": "100",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
}
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex-1", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

res, err := c.OTLPPushExemplar("exemplar_1")
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

now := time.Now()
exemplars, err := c.QueryExemplars("exemplar_1", now.Add(-time.Minute), now.Add(time.Minute))
require.NoError(t, err)
require.Equal(t, 1, len(exemplars))
}

0 comments on commit ff782ca

Please sign in to comment.