Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ruler: add ability to remote_write directly #6367

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,3 +1805,77 @@ func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
},
}
}

func TestRulerEvalWithQueryFrontendAndRemoteWrite(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
// We run single ingester only, no replication.
"-distributor.replication-factor": "1",
"-log.level": "debug",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
require.NoError(t, s.Start(queryFrontend))

require.NoError(t, writeFileToSharedDir(s, "rulercfg.yml", []byte(`ruler:
remote_write:
headers:
X-Scope-OrgID: "test-org-id"`)))

ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
"-ruler.remote-write-url": fmt.Sprintf("http://%s/api/v1/push", distributor.NetworkEndpoint(80)),
"-config.file": filepath.Join(e2e.ContainerSharedDir, "rulercfg.yml"),
}), "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(ruler, querier))

c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "metric"
groupName := "rule_group"
ruleName := "rule_name"
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))

rgMatcher := ruleGroupMatcher(user, namespace, groupName)
// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
// Check that cortex_ruler_query_frontend_clients went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_queries_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_total went up
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
// Check that cortex_ruler_write_requests_failed_total is zero
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
}
8 changes: 6 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,11 +613,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
var pusher ruler.Pusher = t.Distributor
if t.Cfg.Ruler.RemoteWriteConfig.URL != "" {
pusher = ruler.NewRemoteWritePusher(t.Cfg.Ruler.RemoteWriteConfig.URL, t.Cfg.Ruler.RemoteWriteConfig.Headers)
}
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

Expand Down Expand Up @@ -821,7 +825,7 @@ func (t *Cortex) setupModuleManager() error {
TenantFederation: {Queryable},
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler, Compactor, AlertManager},
}
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
if (t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil) || (t.Cfg.Ruler.FrontendAddress != "" && t.Cfg.Ruler.RemoteWriteConfig.URL != "") {
deps[Ruler] = []string{Overrides, RulerStorage}
}
for mod, targets := range deps {
Expand Down
125 changes: 125 additions & 0 deletions pkg/ruler/remote_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package ruler

import (
"bytes"
"context"
"fmt"
"io"
"net/http"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/klauspost/compress/snappy"
"github.com/prometheus/prometheus/prompb"
)

type RemoteWritePusher struct {
u string
headers map[string]string
}

func NewRemoteWritePusher(u string, headers map[string]string) *RemoteWritePusher {
return &RemoteWritePusher{
u: u,
headers: headers,
}
}

var _ Pusher = &RemoteWritePusher{}

func (r *RemoteWritePusher) Push(ctx context.Context, wr *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
promwr := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0, len(wr.Timeseries)),
Metadata: make([]prompb.MetricMetadata, 0, len(wr.Metadata)),
}

for _, ts := range wr.Timeseries {
promwr.Timeseries = append(promwr.Timeseries, prompb.TimeSeries{
Labels: makeLabels(ts.Labels),
Samples: makeSamples(ts.Samples),
Exemplars: makeExemplars(ts.Exemplars),
//Histograms: makeHistograms(ts.Histograms),
})
}

for _, m := range wr.Metadata {
promwr.Metadata = append(promwr.Metadata, prompb.MetricMetadata{
Type: prompb.MetricMetadata_MetricType(m.Type),
Unit: m.Unit,
Help: m.Help,
MetricFamilyName: m.MetricFamilyName,
})
}

m, err := promwr.Marshal()
if err != nil {
return nil, err
}

encoded := snappy.Encode(nil, m)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.u, bytes.NewReader(encoded))
if err != nil {
return nil, err
}

for k, v := range r.headers {
req.Header.Set(k, v)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}

if resp.Body != nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}

if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("got status code: %d", resp.StatusCode)
}

return &cortexpb.WriteResponse{}, nil
}

func makeLabels(in []cortexpb.LabelAdapter) []prompb.Label {
out := make([]prompb.Label, 0, len(in))
for _, l := range in {
out = append(out, prompb.Label{Name: l.Name, Value: l.Value})
}
return out
}

func makeSamples(in []cortexpb.Sample) []prompb.Sample {
out := make([]prompb.Sample, 0, len(in))
for _, s := range in {
out = append(out, prompb.Sample{
Value: s.Value,
Timestamp: s.TimestampMs,
})
}
return out
}

func makeExemplars(in []cortexpb.Exemplar) []prompb.Exemplar {
out := make([]prompb.Exemplar, 0, len(in))
for _, e := range in {
out = append(out, prompb.Exemplar{
Labels: makeLabels(e.Labels),
Value: e.Value,
Timestamp: e.TimestampMs,
})
}
return out
}

/*
func makeHistograms(in []cortexpb.Histogram) []prompb.Histogram {
out := make([]prompb.Histogram, 0, len(in))
for _, h := range in {
out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h))
}
return out
}
*/
10 changes: 10 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (e *DisabledRuleGroupErr) Error() string {
return e.Message
}

type RemoteWriteConfig struct {
URL string `yaml:"url"`
Headers map[string]string `yaml:"headers"`
}

// Config is the configuration for the recording rules server.
type Config struct {
// This is used for query to query frontend to evaluate rules
Expand All @@ -113,6 +118,10 @@ type Config struct {
// Path to store rule files for prom manager.
RulePath string `yaml:"rule_path"`

// Configuration for remote_write. If this is configured then
// Ruler only writes to this address.
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write"`

// URL of the Alertmanager to send notifications to.
// If you are configuring the ruler to send to a Cortex Alertmanager,
// ensure this includes any path set in the Alertmanager external URL.
Expand Down Expand Up @@ -217,6 +226,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.")
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
f.StringVar(&cfg.RemoteWriteConfig.URL, "ruler.remote-write-url", "", "URL of the remote write endpoint to send samples to.")

f.DurationVar(&cfg.SearchPendingFor, "ruler.search-pending-for", 5*time.Minute, "Time to spend searching for a pending ruler when shutting down.")
f.BoolVar(&cfg.EnableSharding, "ruler.enable-sharding", false, "Distribute rule evaluation using ring backend")
Expand Down