From 80a8417814f04ff05fe96fdb106f3572d5877620 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 22 Nov 2024 11:25:30 +0200 Subject: [PATCH] ruler: add ability to remote_write directly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add ability to use remote_write directly. This makes the component more general and makes it possible to use it with Thanos. Signed-off-by: Giedrius Statkevičius --- integration/ruler_test.go | 74 ++++++++++++++++++++++ pkg/cortex/modules.go | 8 ++- pkg/ruler/remote_write.go | 125 ++++++++++++++++++++++++++++++++++++++ pkg/ruler/ruler.go | 10 +++ 4 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 pkg/ruler/remote_write.go diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f7d16507d1..8336b7431c 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -1805,3 +1805,77 @@ func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup { }, } } + +func TestRulerEvalWithQueryFrontendAndRemoteWrite(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + map[string]string{ + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + "-log.level": "debug", + }, + ) + + const namespace = "test" + const user = "user" + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + require.NoError(t, writeFileToSharedDir(s, "rulercfg.yml", []byte(`ruler: + remote_write: + headers: + X-Scope-OrgID: "test-org-id"`))) + + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + "-ruler.remote-write-url": fmt.Sprintf("http://%s/api/v1/push", distributor.NetworkEndpoint(80)), + "-config.file": filepath.Join(e2e.ContainerSharedDir, "rulercfg.yml"), + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(ruler, querier)) + + c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "metric" + groupName := "rule_group" + ruleName := "rule_name" + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace)) + + rgMatcher := ruleGroupMatcher(user, namespace, groupName) + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics)) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + // Check that cortex_ruler_query_frontend_clients went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_queries_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_total went up + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + // Check that cortex_ruler_write_requests_failed_total is zero + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 00f0b10a20..f051fb2c10 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -613,11 +613,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } else { + var pusher ruler.Pusher = t.Distributor + if t.Cfg.Ruler.RemoteWriteConfig.URL != "" { + pusher = ruler.NewRemoteWritePusher(t.Cfg.Ruler.RemoteWriteConfig.URL, t.Cfg.Ruler.RemoteWriteConfig.Headers) + } rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer) manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } @@ -821,7 +825,7 @@ func (t *Cortex) setupModuleManager() error { TenantFederation: {Queryable}, All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler, Compactor, AlertManager}, } - if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { + if (t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil) || (t.Cfg.Ruler.FrontendAddress != "" && t.Cfg.Ruler.RemoteWriteConfig.URL != "") { deps[Ruler] = []string{Overrides, RulerStorage} } for mod, targets := range deps { diff --git a/pkg/ruler/remote_write.go b/pkg/ruler/remote_write.go new file mode 100644 index 0000000000..8a96fdac8f --- /dev/null +++ b/pkg/ruler/remote_write.go @@ -0,0 +1,125 @@ +package ruler + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/klauspost/compress/snappy" + "github.com/prometheus/prometheus/prompb" +) + +type RemoteWritePusher struct { + u string + headers map[string]string +} + +func NewRemoteWritePusher(u string, headers map[string]string) *RemoteWritePusher { + return &RemoteWritePusher{ + u: u, + headers: headers, + } +} + +var _ Pusher = &RemoteWritePusher{} + +func (r *RemoteWritePusher) Push(ctx context.Context, wr *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + promwr := &prompb.WriteRequest{ + Timeseries: make([]prompb.TimeSeries, 0, len(wr.Timeseries)), + Metadata: make([]prompb.MetricMetadata, 0, len(wr.Metadata)), + } + + for _, ts := range wr.Timeseries { + promwr.Timeseries = append(promwr.Timeseries, prompb.TimeSeries{ + Labels: makeLabels(ts.Labels), + Samples: makeSamples(ts.Samples), + Exemplars: makeExemplars(ts.Exemplars), + //Histograms: makeHistograms(ts.Histograms), + }) + } + + for _, m := range wr.Metadata { + promwr.Metadata = append(promwr.Metadata, prompb.MetricMetadata{ + Type: prompb.MetricMetadata_MetricType(m.Type), + Unit: m.Unit, + Help: m.Help, + MetricFamilyName: m.MetricFamilyName, + }) + } + + m, err := promwr.Marshal() + if err != nil { + return nil, err + } + + encoded := snappy.Encode(nil, m) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.u, bytes.NewReader(encoded)) + if err != nil { + return nil, err + } + + for k, v := range r.headers { + req.Header.Set(k, v) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + if resp.Body != nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("got status code: %d", resp.StatusCode) + } + + return &cortexpb.WriteResponse{}, nil +} + +func makeLabels(in []cortexpb.LabelAdapter) []prompb.Label { + out := make([]prompb.Label, 0, len(in)) + for _, l := range in { + out = append(out, prompb.Label{Name: l.Name, Value: l.Value}) + } + return out +} + +func makeSamples(in []cortexpb.Sample) []prompb.Sample { + out := make([]prompb.Sample, 0, len(in)) + for _, s := range in { + out = append(out, prompb.Sample{ + Value: s.Value, + Timestamp: s.TimestampMs, + }) + } + return out +} + +func makeExemplars(in []cortexpb.Exemplar) []prompb.Exemplar { + out := make([]prompb.Exemplar, 0, len(in)) + for _, e := range in { + out = append(out, prompb.Exemplar{ + Labels: makeLabels(e.Labels), + Value: e.Value, + Timestamp: e.TimestampMs, + }) + } + return out +} + +/* +func makeHistograms(in []cortexpb.Histogram) []prompb.Histogram { + out := make([]prompb.Histogram, 0, len(in)) + for _, h := range in { + out = append(out, cortexpb.HistogramPromProtoToHistogramProto(h)) + } + return out +} +*/ diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index d77b4d0a41..b035cf33ae 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -92,6 +92,11 @@ func (e *DisabledRuleGroupErr) Error() string { return e.Message } +type RemoteWriteConfig struct { + URL string `yaml:"url"` + Headers map[string]string `yaml:"headers"` +} + // Config is the configuration for the recording rules server. type Config struct { // This is used for query to query frontend to evaluate rules @@ -113,6 +118,10 @@ type Config struct { // Path to store rule files for prom manager. RulePath string `yaml:"rule_path"` + // Configuration for remote_write. If this is configured then + // Ruler only writes to this address. + RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write"` + // URL of the Alertmanager to send notifications to. // If you are configuring the ruler to send to a Cortex Alertmanager, // ensure this includes any path set in the Alertmanager external URL. @@ -217,6 +226,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing DNS resolutions of Alertmanager hosts.") f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.") f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.") + f.StringVar(&cfg.RemoteWriteConfig.URL, "ruler.remote-write-url", "", "URL of the remote write endpoint to send samples to.") f.DurationVar(&cfg.SearchPendingFor, "ruler.search-pending-for", 5*time.Minute, "Time to spend searching for a pending ruler when shutting down.") f.BoolVar(&cfg.EnableSharding, "ruler.enable-sharding", false, "Distribute rule evaluation using ring backend")