Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: do not propagate errors with non-utf8 characters #10236

Merged
merged 4 commits into from
Dec 15, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## main / unreleased

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145

Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,15 @@ func httpRetryableToOTLPRetryable(httpStatusCode int) int {
// writeErrorToHTTPResponseBody converts the given error into a grpc status and marshals it into a byte slice, in order to be written to the response body.
// See doc https://opentelemetry.io/docs/specs/otlp/#failures-1
func writeErrorToHTTPResponseBody(reqCtx context.Context, w http.ResponseWriter, httpCode int, grpcCode codes.Code, msg string, logger log.Logger) {
validUTF8Msg := validUTF8Message(msg)
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("X-Content-Type-Options", "nosniff")
if server.IsHandledByHttpgrpcServer(reqCtx) {
w.Header().Set(server.ErrorMessageHeaderKey, msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
w.Header().Set(server.ErrorMessageHeaderKey, validUTF8Msg) // If httpgrpc Server wants to convert this HTTP response into error, use this error message, instead of using response body.
}
w.WriteHeader(httpCode)

respBytes, err := proto.Marshal(status.New(grpcCode, msg).Proto())
respBytes, err := proto.Marshal(status.New(grpcCode, validUTF8Msg).Proto())
if err != nil {
level.Error(logger).Log("msg", "otlp response marshal failed", "err", err)
writeResponseFailedBody, _ := proto.Marshal(status.New(codes.Internal, "failed to marshal OTLP response").Proto())
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func handler(
level.Error(logger).Log(msgs...)
}
addHeaders(w, err, r, code, retryCfg)
http.Error(w, msg, code)
http.Error(w, validUTF8Message(msg), code)
}
})
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,25 @@ func TestOTLPPushHandlerErrorsAreReportedCorrectlyViaHttpgrpc(t *testing.T) {
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found i, error found in #1 byte of ...|invalid|..., bigger context ...|invalid|...",
},

"invalid JSON with non-utf8 characters request returns 400": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json"}},
},
Url: "/otlp",
Body: []byte("\n\xf6\x16\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that the test is correct, but it would be easier to read and understand it if it had a comment explaining which character is not valid UTF-8 and with which other character we expect it to be replaced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a comment explaining which characters are non-UTF8, and what should they be replaced with. Thank you.

},
expectedResponse: &httpgrpc.HTTPResponse{Code: 400,
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/octet-stream"}},
{Key: "X-Content-Type-Options", Values: []string{"nosniff"}},
},
Body: mustMarshalStatus(t, 400, "ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|..."),
},
expectedGrpcErrorMessage: "rpc error: code = Code(400) desc = ReadObjectCB: expect { or n, but found \ufffd, error found in #2 byte of ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011co|..., bigger context ...|\n\ufffd\u0016\n\ufffd\u0002\n\u001d\n\u0011container.runtime\u0012\u0008\n\u0006docker\n'\n\u0012container.h|...",
},

"empty JSON is good request, with 200 status code": {
request: &httpgrpc.HTTPRequest{
Method: "POST",
Expand Down
14 changes: 13 additions & 1 deletion pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI
return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls))
} else if !skipLabelValidation && !model.LabelValue(l.Value).IsValid() {
m.invalidLabelValue.WithLabelValues(userID, group).Inc()
return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, strings.ToValidUTF8(l.Value, ""), unsafeMetricName)
return fmt.Errorf(invalidLabelValueMsgFormat, l.Name, validUTF8Message(l.Value), unsafeMetricName)
} else if len(l.Value) > maxLabelValueLength {
m.labelValueTooLong.WithLabelValues(userID, group).Inc()
return fmt.Errorf(labelValueTooLongMsgFormat, l.Name, l.Value, mimirpb.FromLabelAdaptersToString(ls))
Expand Down Expand Up @@ -512,3 +512,15 @@ func getMetricAndEllipsis(ls []mimirpb.LabelAdapter) (string, string) {
}
return metric, ellipsis
}

// validUTF8ErrMessage ensures that the given message contains only valid utf8 characters.
// The presence of non-utf8 characters in some errors might break some crucial parts of distributor's logic.
// For example, if httpgrpc.HTTPServer.Handle() returns a httpgprc.Error containing a non-utf8 character,
// this error will not be propagated to httpgrpc.HTTPClient as a htttpgrpc.Error, but as a generic error,
// which might break some of Mimir internal logic.
// This is because golang's proto.Marshal(), which is used by gRPC internally, fails when it marshals the
// httpgrpc.Error containing non-utf8 character produced by httpgrpc.HTTPServer.Handle(), making the resulting
// error lose some important properties.
func validUTF8Message(msg string) string {
return strings.ToValidUTF8(msg, string(utf8.RuneError))
}
65 changes: 64 additions & 1 deletion pkg/distributor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ package distributor
import (
"errors"
"fmt"
"net/http"
"strings"
"testing"
"time"
"unicode/utf8"

"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/httpgrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
grpcstatus "google.golang.org/grpc/status"
golangproto "google.golang.org/protobuf/proto"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -212,7 +218,7 @@ func TestValidateLabels(t *testing.T) {
skipLabelCountValidation: false,
err: fmt.Errorf(
invalidLabelValueMsgFormat,
"label1", "abcdef", "foo",
"label1", "abc\ufffddef", "foo",
),
},
{
Expand Down Expand Up @@ -671,3 +677,60 @@ func tooManyLabelsArgs(series []mimirpb.LabelAdapter, limit int) []any {

return []any{len(series), limit, metric, ellipsis}
}

func TestValidUTF8Message(t *testing.T) {
testCases := map[string]struct {
body []byte
containsNonUTF8Characters bool
}{
"valid message returns no error": {
body: []byte("valid message"),
containsNonUTF8Characters: false,
},
"message containing only utf8 characters retruns no error": {
duricanikolic marked this conversation as resolved.
Show resolved Hide resolved
body: []byte("\n\ufffd\u0016\n\ufffd\u0002\n\u001D\n\u0011container.runtime\u0012\b\n\u0006docker\n'\n\u0012container.h"),
containsNonUTF8Characters: false,
},
"message containing non-utf8 character returns an error": {
body: []byte("\n\xf6\x1a\n\xd3\x02\n\x1d\n\x11container.runtime\x12\x08\n\x06docker\n'\n\x12container.h"),
containsNonUTF8Characters: true,
},
}

for name, tc := range testCases {
for _, withValidation := range []bool{false, true} {
t.Run(fmt.Sprintf("%s withValidation: %v", name, withValidation), func(t *testing.T) {
msg := string(tc.body)
if withValidation {
msg = validUTF8Message(msg)
}
httpgrpcErr := httpgrpc.Error(http.StatusBadRequest, msg)

// gogo's proto.Marshal() correctly processes both httpgrpc errors with and without non-utf8 characters.
st, ok := grpcutil.ErrorToStatus(httpgrpcErr)
require.True(t, ok)
stBytes, err := proto.Marshal(st.Proto())
require.NoError(t, err)
require.NotNil(t, stBytes)

//lint:ignore faillint We want to explicitly use on grpcstatus.FromError()
grpcSt, ok := grpcstatus.FromError(httpgrpcErr)
require.True(t, ok)
stBytes, err = golangproto.Marshal(grpcSt.Proto())
if withValidation {
// Ensure that errors with validated messages can always be correctly marshaled.
require.NoError(t, err)
require.NotNil(t, stBytes)
} else {
if tc.containsNonUTF8Characters {
// Ensure that errors with non-validated non-utf8 messages cannot be correctly marshaled.
require.Error(t, err)
} else {
require.NoError(t, err)
require.NotNil(t, stBytes)
}
}
})
}
}
}
Loading