Skip to content

Commit

Permalink
Merge branch 'master' into feat/external-labels
Browse files Browse the repository at this point in the history
Signed-off-by: Friedrich Gonzalez <[email protected]>
  • Loading branch information
friedrichg authored Nov 15, 2024
2 parents 24be605 + 7c973ee commit fdd1cf4
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scorecards.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard (optional).
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@662472033e021d55d94146f66f6058822b0b39fd # v3.27.0
uses: github/codeql-action/upload-sarif@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3.27.4
with:
sarif_file: results.sarif
6 changes: 3 additions & 3 deletions .github/workflows/test-build-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ jobs:

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@662472033e021d55d94146f66f6058822b0b39fd # v3.27.0
uses: github/codeql-action/init@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3.27.4
with:
languages: go

- name: Autobuild
uses: github/codeql-action/autobuild@662472033e021d55d94146f66f6058822b0b39fd # v3.27.0
uses: github/codeql-action/autobuild@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3.27.4

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@662472033e021d55d94146f66f6058822b0b39fd # v3.27.0
uses: github/codeql-action/analyze@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3.27.4


build:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
* [ENHANCEMENT] Query Frontend: Add new query stats metrics `cortex_query_samples_scanned_total` and `cortex_query_peak_samples` to track scannedSamples and peakSample per user. #6228
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2562,6 +2562,10 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# Maximum OTLP request size in bytes that the Distributor can accept.
# CLI flag: -distributor.otlp-max-recv-msg-size
[otlp_max_recv_msg_size: <int> | default = 104857600]
# Timeout for downstream ingesters.
# CLI flag: -distributor.remote-timeout
[remote_timeout: <duration> | default = 2s]
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
8 changes: 5 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ type Config struct {

HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
OTLPMaxRecvMsgSize int `yaml:"otlp_max_recv_msg_size"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`

ShardingStrategy string `yaml:"sharding_strategy"`
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
Expand Down Expand Up @@ -186,6 +187,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.DistributorRing.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
f.IntVar(&cfg.OTLPMaxRecvMsgSize, "distributor.otlp-max-recv-msg-size", 100<<20, "Maximum OTLP request size in bytes that the Distributor can accept.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Alert struct {
// RuleDiscovery has info for all rules
type RuleDiscovery struct {
RuleGroups []*RuleGroup `json:"groups"`
GroupNextToken string `json:"groupNextToken:omitempty"`
GroupNextToken string `json:"groupNextToken,omitempty"`
}

// RuleGroup has info for rules which are part of a group
Expand Down
86 changes: 86 additions & 0 deletions pkg/ruler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gorilla/mux"
Expand All @@ -20,6 +22,90 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)

func TestAPIResponseSerialization(t *testing.T) {
lastEvalTime := time.Now()
responseTime := lastEvalTime.Format(time.RFC3339Nano)
testCases := map[string]struct {
rules RuleDiscovery
expectedJSON string
}{
"No rules": {
rules: RuleDiscovery{
RuleGroups: make([]*RuleGroup, 0),
},
expectedJSON: `{
"groups":[]
}`,
},
"Rules with no next token": {
rules: RuleDiscovery{
RuleGroups: []*RuleGroup{
{
Name: "Test",
File: "/rules/Test",
Rules: make([]rule, 0),
Interval: 60,
LastEvaluation: lastEvalTime,
EvaluationTime: 10,
Limit: 0,
},
},
},
expectedJSON: fmt.Sprintf(`{
"groups": [
{
"evaluationTime": 10,
"limit": 0,
"name": "Test",
"file": "/rules/Test",
"interval": 60,
"rules": [],
"lastEvaluation": "%s"
}
]
}`, responseTime),
},
"Rules with next token": {
rules: RuleDiscovery{
RuleGroups: []*RuleGroup{
{
Name: "Test",
File: "/rules/Test",
Rules: make([]rule, 0),
Interval: 60,
LastEvaluation: lastEvalTime,
EvaluationTime: 10,
Limit: 0,
},
},
GroupNextToken: "abcdef",
},
expectedJSON: fmt.Sprintf(`{
"groups": [
{
"evaluationTime": 10,
"limit": 0,
"name": "Test",
"file": "/rules/Test",
"interval": 60,
"rules": [],
"lastEvaluation": "%s"
}
],
"groupNextToken": "abcdef"
}`, responseTime),
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
data, err := json.Marshal(&tc.rules)
require.NoError(t, err)
require.JSONEq(t, tc.expectedJSON, string(data))
})
}
}

func TestRuler_rules(t *testing.T) {
store := newMockRuleStore(mockRules, nil)
cfg := defaultRulerConfig(t)
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"flag"
Expand Down Expand Up @@ -143,6 +144,7 @@ type CompressionType int
const (
NoCompression CompressionType = iota
RawSnappy
Gzip
)

// ParseProtoReader parses a compressed proto from an io.Reader.
Expand Down Expand Up @@ -215,6 +217,13 @@ func decompressFromReader(reader io.Reader, expectedSize, maxSize int, compressi
return nil, err
}
body, err = decompressFromBuffer(&buf, maxSize, RawSnappy, sp)
case Gzip:
reader, err = gzip.NewReader(reader)
if err != nil {
return nil, err
}
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
}
return body, err
}
Expand Down
86 changes: 83 additions & 3 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package push

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/annotations"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
Expand All @@ -24,8 +28,13 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
pbContentType = "application/x-protobuf"
jsonContentType = "application/json"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := util_log.WithContext(ctx, util_log.Logger)
Expand All @@ -42,7 +51,7 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
return
}

req, err := remote.DecodeOTLPWriteRequest(r)
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -90,6 +99,64 @@ func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, so
})
}

func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
expectedSize := int(r.ContentLength)
if expectedSize > maxSize {
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
}

contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")

var compressionType util.CompressionType
switch contentEncoding {
case "gzip":
compressionType = util.Gzip
case "":
compressionType = util.NoCompression
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported compression: %s, Supported compression types are \"gzip\" or '' (no compression)", contentEncoding)
}

var decoderFunc func(reader io.Reader) (pmetricotlp.ExportRequest, error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()
otlpReqProto := otlpProtoMessage{req: &req}
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
}
case jsonContentType:
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
req := pmetricotlp.NewExportRequest()

reader = io.LimitReader(reader, int64(maxSize)+1)
if compressionType == util.Gzip {
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return req, err
}
}

var buf bytes.Buffer
if expectedSize > 0 {
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
_, err := buf.ReadFrom(reader)
if err != nil {
return req, err
}

return req, req.UnmarshalJSON(buf.Bytes())
}
default:
return pmetricotlp.NewExportRequest(), fmt.Errorf("unsupported content type: %s, supported: [%s, %s]", contentType, jsonContentType, pbContentType)
}

return decoderFunc(r.Body)
}

func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
Expand Down Expand Up @@ -223,3 +290,16 @@ func joinAttributeMaps(from, to pcommon.Map) {
return true
})
}

// otlpProtoMessage Implements proto.Meesage, proto.Unmarshaler
type otlpProtoMessage struct {
req *pmetricotlp.ExportRequest
}

func (otlpProtoMessage) ProtoMessage() {}

func (otlpProtoMessage) Reset() {}

func (otlpProtoMessage) String() string { return "" }

func (o otlpProtoMessage) Unmarshal(data []byte) error { return o.req.UnmarshalProto(data) }
Loading

0 comments on commit fdd1cf4

Please sign in to comment.