Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: Move resource attribute promotion specialization to Overrides #10217

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,14 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
limits.SpecializeResourceAttributePromotionConfig(pushConfig.OTelResourceAttributePromotionConfig)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(
pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader,
a.cfg.SkipLabelCountValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger,
), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits, pushConfig.OTelResourceAttributePromotionConfig,
pushConfig.MaxOTLPRequestSize, d.RequestBufferPool, a.sourceIPs, limits,
pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger,
), true, false, "POST")

Expand Down
10 changes: 2 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,8 @@ type Distributor struct {
func defaultSleep(d time.Duration) { time.Sleep(d) }
func defaultNow() time.Time { return time.Now() }

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// Config contains the configuration required to
// create a Distributor
// create a Distributor.
type Config struct {
PoolConfig PoolConfig `yaml:"pool"`

Expand Down Expand Up @@ -247,7 +241,7 @@ type Config struct {
ReusableIngesterPushWorkers int `yaml:"reusable_ingester_push_workers" category:"advanced"`

// OTelResourceAttributePromotionConfig allows for specializing OTel resource attribute promotion.
OTelResourceAttributePromotionConfig OTelResourceAttributePromotionConfig `yaml:"-"`
OTelResourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig `yaml:"-"`
}

// PushWrapper wraps around a push. It is similar to middleware.Interface.
Expand Down
9 changes: 3 additions & 6 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const (
)

type OTLPHandlerLimits interface {
validation.OTelResourceAttributePromotionConfig

OTelMetricSuffixesEnabled(id string) bool
OTelCreatedTimestampZeroIngestionEnabled(id string) bool
PromoteOTelResourceAttributes(id string) []string
OTelKeepIdentifyingResourceAttributes(id string) bool
}

Expand All @@ -60,7 +61,6 @@ func OTLPHandler(
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
limits OTLPHandlerLimits,
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
Expand Down Expand Up @@ -171,10 +171,7 @@ func OTLPHandler(
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)
enableCTZeroIngestion := limits.OTelCreatedTimestampZeroIngestionEnabled(tenantID)
if resourceAttributePromotionConfig == nil {
resourceAttributePromotionConfig = limits
}
promoteResourceAttributes := resourceAttributePromotionConfig.PromoteOTelResourceAttributes(tenantID)
promoteResourceAttributes := limits.PromoteOTelResourceAttributes(tenantID)
keepIdentifyingResourceAttributes := limits.OTelKeepIdentifyingResourceAttributes(tenantID)

pushMetrics.IncOTLPRequest(tenantID)
Expand Down
16 changes: 9 additions & 7 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, "")
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestHandlerOTLPPush(t *testing.T) {
expectedRetryHeader bool
promoteResourceAttributes []string
expectedAttributePromotions map[string]string
resourceAttributePromotionConfig OTelResourceAttributePromotionConfig
resourceAttributePromotionConfig validation.OTelResourceAttributePromotionConfig
}

samplesVerifierFunc := func(t *testing.T, pushReq *Request, tc testCase) error {
Expand Down Expand Up @@ -742,6 +742,8 @@ func TestHandlerOTLPPush(t *testing.T) {
}),
)
require.NoError(t, err)
limits.SpecializeResourceAttributePromotionConfig(tt.resourceAttributePromotionConfig)

pusher := func(_ context.Context, pushReq *Request) error {
t.Helper()
t.Cleanup(pushReq.CleanUp)
Expand All @@ -750,7 +752,7 @@ func TestHandlerOTLPPush(t *testing.T) {

logs := &concurrency.SyncBuffer{}
retryConfig := RetryConfig{Enabled: true, MinBackoff: 5 * time.Second, MaxBackoff: 5 * time.Second}
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, tt.resourceAttributePromotionConfig, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))
handler := OTLPHandler(tt.maxMsgSize, nil, nil, limits, retryConfig, pusher, nil, nil, level.NewFilter(log.NewLogfmtLogger(logs), level.AllowInfo()))

resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand Down Expand Up @@ -823,7 +825,7 @@ func TestHandler_otlpDroppedMetricsPanic(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
assert.NoError(t, err)
assert.Len(t, request.Timeseries, 3)
Expand Down Expand Up @@ -869,7 +871,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req := createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp := httptest.NewRecorder()
handler := OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler := OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand All @@ -895,7 +897,7 @@ func TestHandler_otlpDroppedMetricsPanic2(t *testing.T) {

req = createOTLPProtoRequest(t, pmetricotlp.NewExportRequestFromMetrics(md), "")
resp = httptest.NewRecorder()
handler = OTLPHandler(100000, nil, nil, limits, nil, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
handler = OTLPHandler(100000, nil, nil, limits, RetryConfig{}, func(_ context.Context, pushReq *Request) error {
request, err := pushReq.WriteRequest()
t.Cleanup(pushReq.CleanUp)
require.NoError(t, err)
Expand Down Expand Up @@ -923,7 +925,7 @@ func TestHandler_otlpWriteRequestTooBigWithCompression(t *testing.T) {

resp := httptest.NewRecorder()

handler := OTLPHandler(140, nil, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler := OTLPHandler(140, nil, nil, nil, RetryConfig{}, readBodyPushFunc(t), nil, nil, log.NewNopLogger())
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.Code)
body, err := io.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {

return nil
}
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, nil, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
h := OTLPHandler(200, util.NewBufferPool(0), nil, otlpLimitsMock{}, RetryConfig{}, push, newPushMetrics(reg), reg, log.NewNopLogger())
srv.HTTP.Handle("/otlp", h)

// start the server
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ type TenantLimits interface {
type Overrides struct {
defaultLimits *Limits
tenantLimits TenantLimits

// otelResourceAttributePromotionCfg, if set, specializes OTel resource attribute promotion configuration.
otelResourceAttributePromotionCfg OTelResourceAttributePromotionConfig
}

// NewOverrides makes a new Overrides.
Expand Down Expand Up @@ -1112,9 +1115,18 @@ func (o *Overrides) OTelCreatedTimestampZeroIngestionEnabled(tenantID string) bo
}

func (o *Overrides) PromoteOTelResourceAttributes(tenantID string) []string {
if o.otelResourceAttributePromotionCfg != nil {
return o.otelResourceAttributePromotionCfg.PromoteOTelResourceAttributes(tenantID)
}
return o.getOverridesForUser(tenantID).PromoteOTelResourceAttributes
}

// SpecializeResourceAttributePromotionConfig specializes OTel resource attribute promotion configuration.
// This is to allow for plugging in a non-default method for configuring resource attribute promotion.
func (o *Overrides) SpecializeResourceAttributePromotionConfig(specialization OTelResourceAttributePromotionConfig) {
o.otelResourceAttributePromotionCfg = specialization
}

func (o *Overrides) OTelKeepIdentifyingResourceAttributes(tenantID string) bool {
return o.getOverridesForUser(tenantID).OTelKeepIdentifyingResourceAttributes
}
Expand Down Expand Up @@ -1147,6 +1159,12 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
return o.defaultLimits
}

// OTelResourceAttributePromotionConfig contains methods for configuring OTel resource attribute promotion.
type OTelResourceAttributePromotionConfig interface {
// PromoteOTelResourceAttributes returns which OTel resource attributes to promote for tenant ID.
PromoteOTelResourceAttributes(id string) []string
}

// AllTrueBooleansPerTenant returns true only if limit func is true for all given tenants
func AllTrueBooleansPerTenant(tenantIDs []string, f func(string) bool) bool {
for _, tenantID := range tenantIDs {
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,43 @@ alertmanager_max_grafana_state_size_bytes: "0"
}
}

func TestOverrides_PromoteOTelResourceAttributes(t *testing.T) {
const tenant = "tenant"

t.Run("default implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"default.attribute"}, attrs)
})

t.Run("specialized implementation", func(t *testing.T) {
overrides, err := NewOverrides(
Limits{PromoteOTelResourceAttributes: []string{"default.attribute"}},
NewMockTenantLimits(map[string]*Limits{}),
)
require.NoError(t, err)

overrides.SpecializeResourceAttributePromotionConfig(fakeOTelResourceAttributePromotionConfig{
resourceAttrs: map[string][]string{tenant: {"specialized.attribute"}},
})
attrs := overrides.PromoteOTelResourceAttributes(tenant)
require.Equal(t, []string{"specialized.attribute"}, attrs)
})
}

type fakeOTelResourceAttributePromotionConfig struct {
resourceAttrs map[string][]string
}

func (c fakeOTelResourceAttributePromotionConfig) PromoteOTelResourceAttributes(tenant string) []string {
return c.resourceAttrs[tenant]
}

func getDefaultLimits() Limits {
limits := Limits{}
flagext.DefaultValues(&limits)
Expand Down
Loading