Skip to content

Commit

Permalink
add splunk provider
Browse files Browse the repository at this point in the history
Signed-off-by: kane8n <[email protected]>
  • Loading branch information
kane8n committed Dec 13, 2024
1 parent 30f4b25 commit d4bd0f2
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 0 deletions.
1 change: 1 addition & 0 deletions artifacts/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions charts/flagger/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
51 changes: 51 additions & 0 deletions docs/gitbook/usage/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,54 @@ Only relevant if the `type` is set to `analysis`.

For the type `analysis`, the value returned by the provider is either `0`
(if the analysis failed), or `1` (analysis passed).

## Splunk

You can create custom metric checks using the Splunk provider.

Create a secret that contains your authentication token that can be found in the Splunk o11y UI.

```yaml
apiVersion: v1
kind: Secret
metadata:
name: splunk
namespace: istio-system
data:
sf_token_key: your-access-token
```

Splunk template example:

```yaml
apiVersion: flagger.app/v1beta1
kind: MetricTemplate
metadata:
name: success-rate
namespace: istio-system
spec:
provider:
type: splunk
address: https://api.<REALM>.signalfx.com
secretRef:
name: splunk
query: |
total = data('traces.count', filter=filter('sf_service', '{{target}}')).sum().publish(enable=False)
success = data('traces.count', filter=filter('sf_service', '{{target}}') and filter('sf_error', 'false')).sum().publish(enable=False)
((success/total) * 100).publish()
```
The query format documentation can be found [here](https://dev.splunk.com/observability/docs/signalflow).

Reference the template in the canary analysis:

```yaml
analysis:
metrics:
- name: "success rate"
templateRef:
name: success-rate
namespace: istio-system
thresholdRange:
max: 99
interval: 1m
```
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/prometheus/client_golang v1.20.5
github.com/signalfx/signalflow-client-go v0.1.0
github.com/signalfx/signalfx-go v1.34.0
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.9.0
Expand Down Expand Up @@ -50,6 +52,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gT
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o=
github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -172,6 +174,10 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw=
github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs=
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
Expand Down
1 change: 1 addition & 0 deletions kustomize/base/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ spec:
- graphite
- dynatrace
- keptn
- splunk
address:
description: API address of this provider
type: string
Expand Down
2 changes: 2 additions & 0 deletions pkg/metrics/providers/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric
return NewDynatraceProvider(metricInterval, provider, credentials)
case "keptn":
return NewKeptnProvider(config)
case "splunk":
return NewSplunkProvider(metricInterval, provider, credentials)
default:
return NewPrometheusProvider(provider, credentials)
}
Expand Down
195 changes: 195 additions & 0 deletions pkg/metrics/providers/splunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
Copyright 2024 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package providers

import (
"cmp"
"context"
"fmt"
"io"
"net/http"
"slices"
"strings"
"time"

"github.com/signalfx/signalflow-client-go/signalflow"
"github.com/signalfx/signalflow-client-go/signalflow/messages"

flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
)

// https://dev.splunk.com/observability/reference
const (
signalFxSignalFlowApiPath = "/v2/signalflow"
signalFxValidationPath = "/v2/metric?limit=1"

signalFxTokenSecretKey = "sf_token_key"

signalFxTokenHeaderKey = "X-SF-Token"

signalFxFromDeltaMultiplierOnMetricInterval = 10
)

// SplunkProvider executes signalfx queries
type SplunkProvider struct {
metricsQueryEndpoint string
apiValidationEndpoint string

timeout time.Duration
token string
fromDelta int64
}

type splunkResponse struct {
}

// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and
// returns a Splunk client ready to execute queries against the API
func NewSplunkProvider(metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte) (*SplunkProvider, error) {

address := provider.Address
if address == "" {
return nil, fmt.Errorf("splunk endpoint is not set")
}

sp := SplunkProvider{
timeout: 5 * time.Second,
// Convert the configured address to match the protocol of the respective API
// ex.
// https://api.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
// wss://stream.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1),
// ex.
// https://api.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
// wss://stream.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1),
}

if b, ok := credentials[signalFxTokenSecretKey]; ok {
sp.token = string(b)
} else {
return nil, fmt.Errorf("splunk credentials does not contain sf_token_key")
}

md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %w", err)
}

sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Milliseconds())
return &sp, nil
}

// RunQuery executes the query and converts the first result to float64
func (p *SplunkProvider) RunQuery(query string) (float64, error) {
c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token))
if err != nil {
return 0, fmt.Errorf("error creating signalflow client: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()

now := time.Now().UnixMilli()
comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{
Program: query,
Start: time.UnixMilli(now - p.fromDelta),
Stop: time.UnixMilli(now),
Immediate: true,
})
if err != nil {
return 0, fmt.Errorf("error executing query: %w", err)
}

payloads := p.receivePaylods(comp)

if comp.Err() != nil {
return 0, fmt.Errorf("error executing query: %w", comp.Err())
}

payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool {
return msg.Value() == nil
})
if len(payloads) < 1 {
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
}

// Error when a SignalFlow query returns two or more results.
// Since a different TSID is set for each metrics to be retrieved, eliminate duplicate TSIDs and determine if two or more TSIDs exist.
_payloads := slices.Clone(payloads)
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
return cmp.Compare(i.TSID, j.TSID)
})
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
}

payload := payloads[len(payloads)-1]
switch payload.Type {
case messages.ValTypeLong:
return float64(payload.Int64()), nil
case messages.ValTypeDouble:
return payload.Float64(), nil
case messages.ValTypeInt:
return float64(payload.Int32()), nil
default:
return 0, fmt.Errorf("invalid response: UnsupportedValueType")
}
}

func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload {
payloads := []messages.DataPayload{}
for dataMsg := range comp.Data() {
if dataMsg == nil {
continue
}
payloads = append(payloads, dataMsg.Payloads...)
}
return payloads
}

// IsOnline calls the provider endpoint and returns an error if the API is unreachable
func (p *SplunkProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %w", err)
}

req.Header.Add(signalFxTokenHeaderKey, p.token)

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, fmt.Errorf("request failed: %w", err)
}

defer r.Body.Close()

b, err := io.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %w", err)
}

if r.StatusCode != http.StatusOK {
return false, fmt.Errorf("error response: %s", string(b))
}

return true, nil
}
Loading

0 comments on commit d4bd0f2

Please sign in to comment.