Skip to content

Commit

Permalink
Change to use signalflow api
Browse files Browse the repository at this point in the history
Signed-off-by: kane8n <[email protected]>
  • Loading branch information
kane8n committed Nov 28, 2024
1 parent 476a870 commit 0b73e24
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 90 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/influxdata/influxdb-client-go/v2 v2.13.0
github.com/prometheus/client_golang v1.19.1
github.com/signalfx/signalflow-client-go v0.1.0
github.com/signalfx/signalfx-go v1.34.0
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
Expand Down Expand Up @@ -50,6 +52,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/imdario/mergo v0.3.15 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -163,6 +165,10 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw=
github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs=
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
Expand Down
89 changes: 41 additions & 48 deletions pkg/metrics/providers/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@ limitations under the License.
package providers

import (
"cmp"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"slices"
"strings"
"time"

"github.com/signalfx/signalflow-client-go/signalflow"
"github.com/signalfx/signalflow-client-go/signalflow/messages"

flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
)

// https://docs.datadoghq.com/api/
const (
signalFxMTSQueryPath = "/v1/timeserieswindow"
signalFxMTSQueryPath = "/v2/signalflow/execute"
signalFxValidationPath = "/v2/metric?limit=1"

signalFxTokenSecretKey = "sf_token_key"
Expand All @@ -51,7 +55,6 @@ type SplunkProvider struct {
}

type splunkResponse struct {
Data map[string][][]float64 `json:"data"`
}

// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and
Expand All @@ -67,8 +70,8 @@ func NewSplunkProvider(metricInterval string,

sp := SplunkProvider{
timeout: 5 * time.Second,
metricsQueryEndpoint: address + signalFxMTSQueryPath,
apiValidationEndpoint: address + signalFxValidationPath,
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxMTSQueryPath, "http", "ws", 1), "api", "stream", 1),
apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1),
}

if b, ok := credentials[signalFxTokenSecretKey]; ok {
Expand All @@ -88,58 +91,48 @@ func NewSplunkProvider(metricInterval string,

// RunQuery executes the query and converts the first result to float64
func (p *SplunkProvider) RunQuery(query string) (float64, error) {

req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil)
c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token))
if err != nil {
return 0, fmt.Errorf("error http.NewRequest: %w", err)
return 0, fmt.Errorf("error creating signalflow client: %w", err)
}

req.Header.Set(signalFxTokenHeaderKey, p.token)
now := time.Now().UnixMilli()
q := req.URL.Query()
q.Add("query", query)
q.Add("startMS", strconv.FormatInt(now-p.fromDelta, 10))
q.Add("endMS", strconv.FormatInt(now, 10))
req.URL.RawQuery = q.Encode()

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return 0, fmt.Errorf("request failed: %w", err)
}

defer r.Body.Close()
b, err := io.ReadAll(r.Body)
now := time.Now().UnixMilli()
comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{
Program: query,
Start: time.Unix(0, (now-p.fromDelta)*time.Millisecond.Nanoseconds()),
Stop: time.Unix(0, now*time.Millisecond.Nanoseconds()),
Immediate: true,
})
if err != nil {
return 0, fmt.Errorf("error reading body: %w", err)
return 0, fmt.Errorf("error executing query: %w", err)
}

if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error response: %s: %w", string(b), err)
}

var res splunkResponse
if err := json.Unmarshal(b, &res); err != nil {
return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b))
}

if len(res.Data) < 1 {
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}

if len(res.Data) > 1 {
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrMultipleValuesReturned)
}

for _, v := range res.Data {
vs := v[len(v)-1]
if len(vs) < 1 {
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
select {
case dataMsg := <-comp.Data():
payloads := slices.DeleteFunc(dataMsg.Payloads, func(msg messages.DataPayload) bool {
return msg.Value() == nil
})
if len(payloads) < 1 {
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
}
_payloads := slices.Clone(payloads)
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
return cmp.Compare(i.TSID, j.TSID)
})
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
}
return payloads[len(payloads)-1].Value().(float64), nil
case <-time.After(p.timeout):
err := comp.Stop(ctx)
if err != nil {
return 0, fmt.Errorf("error stopping query: %w", err)
}
return vs[1], nil
return 0, fmt.Errorf("timeout waiting for query result")
}
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}

// IsOnline calls the provider endpoint and returns an error if the API is unreachable
Expand Down
103 changes: 61 additions & 42 deletions pkg/metrics/providers/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package providers
import (
"errors"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"strconv"
"net/url"
"testing"
"time"

"github.com/signalfx/signalflow-client-go/signalflow"
"github.com/signalfx/signalflow-client-go/signalflow/messages"
"github.com/signalfx/signalfx-go/idtool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -43,84 +47,99 @@ func TestNewSplunkProvider(t *testing.T) {

sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs)
require.NoError(t, err)
assert.Equal(t, "https://api.us1.signalfx.com/v1/timeserieswindow", sp.metricsQueryEndpoint)
assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow/execute", sp.metricsQueryEndpoint)
assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint)
assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta)
assert.Equal(t, token, sp.token)
}

func TestSplunkProvider_RunQuery(t *testing.T) {
token := "token"
t.Run("ok", func(t *testing.T) {
expected := 1.11111
eq := `sf_metric:service.request.count AND http_status_code:*`
now := time.Now().UnixMilli()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
assert.Equal(t, eq, aq)
assert.Equal(t, token, r.Header.Get(signalFxTokenHeaderKey))

from, err := strconv.ParseInt(r.URL.Query().Get("startMS"), 10, 64)
if assert.NoError(t, err) {
assert.Less(t, from, now)
}
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()

to, err := strconv.ParseInt(r.URL.Query().Get("endMS"), 10, 64)
if assert.NoError(t, err) {
assert.GreaterOrEqual(t, to, now)
}
tsids := []idtool.ID{idtool.ID(rand.Int63())}
var expected float64 = float64(len(tsids))

json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,%f]]},"errors":[]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()
for i, tsid := range tsids {
fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{
Metric: "service.request.count",
})
fakeBackend.SetTSIDFloatData(tsid, float64(i+1))
}

pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()`
fakeBackend.AddProgramTSIDs(pg, tsids)

parsedUrl, err := url.Parse(fakeBackend.URL())
require.NoError(t, err)

sp, err := NewSplunkProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)},
map[string][]byte{
signalFxTokenSecretKey: []byte(token),
signalFxTokenSecretKey: []byte(fakeBackend.AccessToken),
},
)
require.NoError(t, err)

f, err := sp.RunQuery(eq)
f, err := sp.RunQuery(pg)
require.NoError(t, err)
assert.Equal(t, expected, f)
})

t.Run("no values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := fmt.Sprintf(`{"data": {}, "errors": []}`)
w.Write([]byte(json))
}))
defer ts.Close()
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()

tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())}
for _, tsid := range tsids {
fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{
Metric: "service.request.count",
})
}

pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish()`
fakeBackend.AddProgramTSIDs(pg, tsids)

parsedUrl, err := url.Parse(fakeBackend.URL())
require.NoError(t, err)

sp, err := NewSplunkProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)},
map[string][]byte{
signalFxTokenSecretKey: []byte(token),
signalFxTokenSecretKey: []byte(fakeBackend.AccessToken),
},
)
require.NoError(t, err)
_, err = sp.RunQuery("")
_, err = sp.RunQuery(pg)
require.True(t, errors.Is(err, ErrNoValuesFound))
})

t.Run("multiple values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := fmt.Sprintf(`{"data":{"AAAAAAAAAAA":[[1731643210000,6]],"AAAAAAAAAAE":[[1731643210000,6]]},"errors":[]}`)
w.Write([]byte(json))
}))
defer ts.Close()
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()

tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())}
for i, tsid := range tsids {
fakeBackend.AddTSIDMetadata(tsid, &messages.MetadataProperties{
Metric: "service.request.count",
})
fakeBackend.SetTSIDFloatData(tsid, float64(i+1))
}
pg := `data('service.request.count', filter=filter('service.name', 'myservice')).sum().publish(); data('service.request.count', filter=filter('service.name', 'myservice2')).sum().publish()`
fakeBackend.AddProgramTSIDs(pg, tsids)

parsedUrl, err := url.Parse(fakeBackend.URL())
require.NoError(t, err)

sp, err := NewSplunkProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
flaggerv1.MetricTemplateProvider{Address: fmt.Sprintf("http://%s", parsedUrl.Host)},
map[string][]byte{
signalFxTokenSecretKey: []byte(token),
signalFxTokenSecretKey: []byte(fakeBackend.AccessToken),
},
)
require.NoError(t, err)
_, err = sp.RunQuery("")
_, err = sp.RunQuery(pg)
require.True(t, errors.Is(err, ErrMultipleValuesReturned))
})
}
Expand Down

0 comments on commit 0b73e24

Please sign in to comment.