From d4bd0f2ef8742c691b895d9846984d056bfffad9 Mon Sep 17 00:00:00 2001 From: kane8n Date: Mon, 18 Nov 2024 22:47:16 +0900 Subject: [PATCH] add splunk provider Signed-off-by: kane8n --- artifacts/flagger/crd.yaml | 1 + charts/flagger/crds/crd.yaml | 1 + docs/gitbook/usage/metrics.md | 51 +++++++ go.mod | 3 + go.sum | 6 + kustomize/base/flagger/crd.yaml | 1 + pkg/metrics/providers/factory.go | 2 + pkg/metrics/providers/splunk.go | 195 +++++++++++++++++++++++++++ pkg/metrics/providers/splunk_test.go | 188 ++++++++++++++++++++++++++ 9 files changed, 448 insertions(+) create mode 100644 pkg/metrics/providers/splunk.go create mode 100644 pkg/metrics/providers/splunk_test.go diff --git a/artifacts/flagger/crd.yaml b/artifacts/flagger/crd.yaml index 39a466dd2..145eda608 100644 --- a/artifacts/flagger/crd.yaml +++ b/artifacts/flagger/crd.yaml @@ -1304,6 +1304,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/charts/flagger/crds/crd.yaml b/charts/flagger/crds/crd.yaml index 39a466dd2..145eda608 100644 --- a/charts/flagger/crds/crd.yaml +++ b/charts/flagger/crds/crd.yaml @@ -1304,6 +1304,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/docs/gitbook/usage/metrics.md b/docs/gitbook/usage/metrics.md index c51fc61c2..58b243ef0 100644 --- a/docs/gitbook/usage/metrics.md +++ b/docs/gitbook/usage/metrics.md @@ -730,3 +730,54 @@ Only relevant if the `type` is set to `analysis`. For the type `analysis`, the value returned by the provider is either `0` (if the analysis failed), or `1` (analysis passed). + +## Splunk + +You can create custom metric checks using the Splunk provider. + +Create a secret that contains your authentication token that can be found in the Splunk o11y UI. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: splunk + namespace: istio-system +data: + sf_token_key: your-access-token +``` + +Splunk template example: + +```yaml +apiVersion: flagger.app/v1beta1 +kind: MetricTemplate +metadata: + name: success-rate + namespace: istio-system +spec: + provider: + type: splunk + address: https://api..signalfx.com + secretRef: + name: splunk + query: | + total = data('traces.count', filter=filter('sf_service', '{{target}}')).sum().publish(enable=False) + success = data('traces.count', filter=filter('sf_service', '{{target}}') and filter('sf_error', 'false')).sum().publish(enable=False) + ((success/total) * 100).publish() +``` +The query format documentation can be found [here](https://dev.splunk.com/observability/docs/signalflow). + +Reference the template in the canary analysis: + +```yaml + analysis: + metrics: + - name: "success rate" + templateRef: + name: success-rate + namespace: istio-system + thresholdRange: + max: 99 + interval: 1m +``` diff --git a/go.mod b/go.mod index 727ec4fb0..46e52c6fd 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,8 @@ require ( github.com/hashicorp/go-retryablehttp v0.7.7 github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/prometheus/client_golang v1.20.5 + github.com/signalfx/signalflow-client-go v0.1.0 + github.com/signalfx/signalfx-go v1.34.0 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.9.0 @@ -50,6 +52,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/s2a-go v0.1.8 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/imdario/mergo v0.3.15 // indirect diff --git a/go.sum b/go.sum index 15ad47023..7393d0817 100644 --- a/go.sum +++ b/go.sum @@ -99,6 +99,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gT github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o= github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -172,6 +174,10 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw= +github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs= +github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE= +github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= diff --git a/kustomize/base/flagger/crd.yaml b/kustomize/base/flagger/crd.yaml index 39a466dd2..145eda608 100644 --- a/kustomize/base/flagger/crd.yaml +++ b/kustomize/base/flagger/crd.yaml @@ -1304,6 +1304,7 @@ spec: - graphite - dynatrace - keptn + - splunk address: description: API address of this provider type: string diff --git a/pkg/metrics/providers/factory.go b/pkg/metrics/providers/factory.go index 2370d7e76..e49e44c56 100644 --- a/pkg/metrics/providers/factory.go +++ b/pkg/metrics/providers/factory.go @@ -43,6 +43,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric return NewDynatraceProvider(metricInterval, provider, credentials) case "keptn": return NewKeptnProvider(config) + case "splunk": + return NewSplunkProvider(metricInterval, provider, credentials) default: return NewPrometheusProvider(provider, credentials) } diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go new file mode 100644 index 000000000..7540ed698 --- /dev/null +++ b/pkg/metrics/providers/splunk.go @@ -0,0 +1,195 @@ +/* +Copyright 2024 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "cmp" + "context" + "fmt" + "io" + "net/http" + "slices" + "strings" + "time" + + "github.com/signalfx/signalflow-client-go/signalflow" + "github.com/signalfx/signalflow-client-go/signalflow/messages" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) + +// https://dev.splunk.com/observability/reference +const ( + signalFxSignalFlowApiPath = "/v2/signalflow" + signalFxValidationPath = "/v2/metric?limit=1" + + signalFxTokenSecretKey = "sf_token_key" + + signalFxTokenHeaderKey = "X-SF-Token" + + signalFxFromDeltaMultiplierOnMetricInterval = 10 +) + +// SplunkProvider executes signalfx queries +type SplunkProvider struct { + metricsQueryEndpoint string + apiValidationEndpoint string + + timeout time.Duration + token string + fromDelta int64 +} + +type splunkResponse struct { +} + +// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and +// returns a Splunk client ready to execute queries against the API +func NewSplunkProvider(metricInterval string, + provider flaggerv1.MetricTemplateProvider, + credentials map[string][]byte) (*SplunkProvider, error) { + + address := provider.Address + if address == "" { + return nil, fmt.Errorf("splunk endpoint is not set") + } + + sp := SplunkProvider{ + timeout: 5 * time.Second, + // Convert the configured address to match the protocol of the respective API + // ex. + // https://api..signalfx.com -> wss://stream..signalfx.com + // wss://stream..signalfx.com -> wss://stream..signalfx.com + metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1), + // ex. + // https://api..signalfx.com -> https://api..signalfx.com + // wss://stream..signalfx.com -> https://api..signalfx.com + apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1), + } + + if b, ok := credentials[signalFxTokenSecretKey]; ok { + sp.token = string(b) + } else { + return nil, fmt.Errorf("splunk credentials does not contain sf_token_key") + } + + md, err := time.ParseDuration(metricInterval) + if err != nil { + return nil, fmt.Errorf("error parsing metric interval: %w", err) + } + + sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Milliseconds()) + return &sp, nil +} + +// RunQuery executes the query and converts the first result to float64 +func (p *SplunkProvider) RunQuery(query string) (float64, error) { + c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token)) + if err != nil { + return 0, fmt.Errorf("error creating signalflow client: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), p.timeout) + defer cancel() + + now := time.Now().UnixMilli() + comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{ + Program: query, + Start: time.UnixMilli(now - p.fromDelta), + Stop: time.UnixMilli(now), + Immediate: true, + }) + if err != nil { + return 0, fmt.Errorf("error executing query: %w", err) + } + + payloads := p.receivePaylods(comp) + + if comp.Err() != nil { + return 0, fmt.Errorf("error executing query: %w", comp.Err()) + } + + payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool { + return msg.Value() == nil + }) + if len(payloads) < 1 { + return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) + } + + // Error when a SignalFlow query returns two or more results. + // Since a different TSID is set for each metrics to be retrieved, eliminate duplicate TSIDs and determine if two or more TSIDs exist. + _payloads := slices.Clone(payloads) + slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { + return cmp.Compare(i.TSID, j.TSID) + }) + if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { + return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) + } + + payload := payloads[len(payloads)-1] + switch payload.Type { + case messages.ValTypeLong: + return float64(payload.Int64()), nil + case messages.ValTypeDouble: + return payload.Float64(), nil + case messages.ValTypeInt: + return float64(payload.Int32()), nil + default: + return 0, fmt.Errorf("invalid response: UnsupportedValueType") + } +} + +func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload { + payloads := []messages.DataPayload{} + for dataMsg := range comp.Data() { + if dataMsg == nil { + continue + } + payloads = append(payloads, dataMsg.Payloads...) + } + return payloads +} + +// IsOnline calls the provider endpoint and returns an error if the API is unreachable +func (p *SplunkProvider) IsOnline() (bool, error) { + req, err := http.NewRequest("GET", p.apiValidationEndpoint, nil) + if err != nil { + return false, fmt.Errorf("error http.NewRequest: %w", err) + } + + req.Header.Add(signalFxTokenHeaderKey, p.token) + + ctx, cancel := context.WithTimeout(req.Context(), p.timeout) + defer cancel() + r, err := http.DefaultClient.Do(req.WithContext(ctx)) + if err != nil { + return false, fmt.Errorf("request failed: %w", err) + } + + defer r.Body.Close() + + b, err := io.ReadAll(r.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %w", err) + } + + if r.StatusCode != http.StatusOK { + return false, fmt.Errorf("error response: %s", string(b)) + } + + return true, nil +} diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go new file mode 100644 index 000000000..acf15b53d --- /dev/null +++ b/pkg/metrics/providers/splunk_test.go @@ -0,0 +1,188 @@ +/* +Copyright 2024 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package providers + +import ( + "errors" + "fmt" + "math/rand" + "net/http" + "net/http/httptest" + "net/url" + "testing" + "time" + + "github.com/signalfx/signalflow-client-go/signalflow" + "github.com/signalfx/signalflow-client-go/signalflow/messages" + "github.com/signalfx/signalfx-go/idtool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" +) + +func TestNewSplunkProvider(t *testing.T) { + token := "token" + cs := map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + } + + mi := "100s" + md, err := time.ParseDuration(mi) + require.NoError(t, err) + + sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs) + require.NoError(t, err) + assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow", sp.metricsQueryEndpoint) + assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) + assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) + assert.Equal(t, token, sp.token) +} + +func TestSplunkProvider_RunQuery(t *testing.T) { + t.Run("ok", func(t *testing.T) { + fakeBackend := signalflow.NewRunningFakeBackend() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() + + tsids := []idtool.ID{idtool.ID(rand.Int63())} + var expected float64 = float64(len(tsids)) + + for i, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + fakeBackend.SetTSIDFloatData(tsid, float64(i+1)) + } + + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), + }, + ) + require.NoError(t, err) + + f, err := sp.RunQuery(pg) + require.NoError(t, err) + assert.Equal(t, expected, f) + }) + + t.Run("no values", func(t *testing.T) { + fakeBackend := signalflow.NewRunningFakeBackend() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() + + tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} + for _, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + } + + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), + }, + ) + require.NoError(t, err) + _, err = sp.RunQuery(pg) + require.True(t, errors.Is(err, ErrNoValuesFound)) + }) + + t.Run("multiple values", func(t *testing.T) { + fakeBackend := signalflow.NewRunningFakeBackend() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() + + tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} + for i, tsid := range tsids { + fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{ + Metric: "service.request.count", + }) + fakeBackend.SetTSIDFloatData(tsid, float64(i+1)) + } + pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish(); data('service.request.count', filter=filter('service.name', 'myservice2')).sum().publish()` + fakeBackend.AddProgramTSIDs(pg, tsids) + + parsedUrl, err := url.Parse(fakeBackend.URL()) + require.NoError(t, err) + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(fakeBackend.AccessToken), + }, + ) + require.NoError(t, err) + _, err = sp.RunQuery(pg) + require.True(t, errors.Is(err, ErrMultipleValuesReturned)) + }) +} + +func TestSplunkProvider_IsOnline(t *testing.T) { + for _, c := range []struct { + code int + errExpected bool + }{ + {code: http.StatusOK, errExpected: false}, + {code: http.StatusUnauthorized, errExpected: true}, + } { + t.Run(fmt.Sprintf("%d", c.code), func(t *testing.T) { + token := "token" + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, token, r.Header.Get(signalFxTokenHeaderKey)) + w.WriteHeader(c.code) + })) + defer ts.Close() + + sp, err := NewSplunkProvider("1m", + flaggerv1.MetricTemplateProvider{Address: ts.URL}, + map[string][]byte{ + signalFxTokenSecretKey: []byte(token), + }, + ) + require.NoError(t, err) + + _, err = sp.IsOnline() + if c.errExpected { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}