Skip to content

Commit

Permalink
Disable label name validation (cortexproject#3736)
Browse files Browse the repository at this point in the history
* allow label name validation to be skipped via request param

Signed-off-by: Trevor Whitney <[email protected]>

* expose distributor via grpc, force validation for http

* distributor needs to be registered with grpc server so
  that other services have this new validation bypass option
  when talking to distributor directly
* util used to translate http -> grpc always sets
  SkipLabelNameValidation to false to force validation on all
  http requests as only gRPC requests should be allowed to bypass
  this validation

Signed-off-by: Trevor Whitney <[email protected]>

* rename Distributor service to PushOnlyIngester

Signed-off-by: Trevor Whitney <[email protected]>
  • Loading branch information
trevorwhitney authored Jan 27, 2021
1 parent f0a0c54 commit e414714
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 105 deletions.
2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func (a *API) RegisterRuntimeConfig(runtimeCfgManager *runtimeconfig.Manager) {

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
client.RegisterPushOnlyIngesterServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig, a.sourceIPs, d.Push), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
7 changes: 4 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string) (ingester_client.PreallocTimeseries, error) {
func (d *Distributor) validateSeries(ts ingester_client.PreallocTimeseries, userID string, skipLabelNameValidation bool) (ingester_client.PreallocTimeseries, error) {
labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, d.cfg.SkipLabelNameValidation); err != nil {
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
return emptyPreallocSeries, err
}

Expand Down Expand Up @@ -496,7 +496,8 @@ func (d *Distributor) Push(ctx context.Context, req *ingester_client.WriteReques
return nil, err
}

validatedSeries, err := d.validateSeries(ts, userID)
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
validatedSeries, err := d.validateSeries(ts, userID, skipLabelNameValidation)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand Down
54 changes: 54 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -832,6 +833,57 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
}
}

func TestDistributor_Push_LabelNameValidation(t *testing.T) {
inputLabels := labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "999.illegal", Value: "baz"},
}
tests := map[string]struct {
inputLabels labels.Labels
skipLabelNameValidationCfg bool
skipLabelNameValidationReq bool
errExpected bool
errMessage string
}{
"label name validation is on by default": {
inputLabels: inputLabels,
errExpected: true,
errMessage: `sample invalid label: "999.illegal" metric "foo{999.illegal=\"baz\"}"`,
},
"label name validation can be skipped via config": {
inputLabels: inputLabels,
skipLabelNameValidationCfg: true,
errExpected: false,
},
"label name validation can be skipped via WriteRequest parameter": {
inputLabels: inputLabels,
skipLabelNameValidationReq: true,
errExpected: false,
},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
ds, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shuffleShardSize: 1,
skipLabelNameValidation: tc.skipLabelNameValidationCfg,
})
req := mockWriteRequest(tc.inputLabels, 42, 100000)
req.SkipLabelNameValidation = tc.skipLabelNameValidationReq
_, err := ds[0].Push(ctx, req)
if tc.errExpected {
fromError, _ := status.FromError(err)
assert.Equal(t, tc.errMessage, fromError.Message())
} else {
assert.Nil(t, err)
}
})
}
}

func TestSlowQueries(t *testing.T) {
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
nIngesters := 3
Expand Down Expand Up @@ -1072,6 +1124,7 @@ type prepConfig struct {
shuffleShardSize int
limits *validation.Limits
numDistributors int
skipLabelNameValidation bool
}

func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring) {
Expand Down Expand Up @@ -1150,6 +1203,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
distributorCfg.DistributorRing.InstanceID = strconv.Itoa(i)
distributorCfg.DistributorRing.KVStore.Mock = kvStore
distributorCfg.DistributorRing.InstanceAddr = "127.0.0.1"
distributorCfg.SkipLabelNameValidation = cfg.skipLabelNameValidation

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
Expand Down
Loading

0 comments on commit e414714

Please sign in to comment.