Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
Signed-off-by: kane8n <[email protected]>
  • Loading branch information
kane8n committed Nov 28, 2024
1 parent 0b73e24 commit f83e8da
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 30 deletions.
69 changes: 43 additions & 26 deletions pkg/metrics/providers/splunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (

// https://docs.datadoghq.com/api/
const (
signalFxMTSQueryPath = "/v2/signalflow/execute"
signalFxValidationPath = "/v2/metric?limit=1"
signalFxSignalFlowApiPath = "/v2/signalflow"
signalFxValidationPath = "/v2/metric?limit=1"

signalFxTokenSecretKey = "sf_token_key"

Expand Down Expand Up @@ -70,7 +70,7 @@ func NewSplunkProvider(metricInterval string,

sp := SplunkProvider{
timeout: 5 * time.Second,
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxMTSQueryPath, "http", "ws", 1), "api", "stream", 1),
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1),
apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1),
}

Expand Down Expand Up @@ -102,37 +102,54 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) {
now := time.Now().UnixMilli()
comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{
Program: query,
Start: time.Unix(0, (now-p.fromDelta)*time.Millisecond.Nanoseconds()),
Stop: time.Unix(0, now*time.Millisecond.Nanoseconds()),
Start: time.UnixMilli(now - p.fromDelta),
Stop: time.UnixMilli(now),
Immediate: true,
})
if err != nil {
return 0, fmt.Errorf("error executing query: %w", err)
}

select {
case dataMsg := <-comp.Data():
payloads := slices.DeleteFunc(dataMsg.Payloads, func(msg messages.DataPayload) bool {
return msg.Value() == nil
})
if len(payloads) < 1 {
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
}
_payloads := slices.Clone(payloads)
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
return cmp.Compare(i.TSID, j.TSID)
})
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
}
return payloads[len(payloads)-1].Value().(float64), nil
case <-time.After(p.timeout):
err := comp.Stop(ctx)
if err != nil {
return 0, fmt.Errorf("error stopping query: %w", err)
payloads := p.receivePaylods(comp)

if comp.Err() != nil {
return 0, fmt.Errorf("error executing query: %w", comp.Err())
}
payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool {
return msg.Value() == nil
})
if len(payloads) < 1 {
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
}
_payloads := slices.Clone(payloads)
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
return cmp.Compare(i.TSID, j.TSID)
})
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
}
payload := payloads[len(payloads)-1]
switch payload.Type {
case messages.ValTypeLong:
return float64(payload.Int64()), nil
case messages.ValTypeDouble:
return payload.Float64(), nil
case messages.ValTypeInt:
return float64(payload.Int32()), nil
default:
return 0, fmt.Errorf("invalid response: UnsupportedValueType")
}
}

func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload {
payloads := []messages.DataPayload{}
for dataMsg := range comp.Data() {
if dataMsg == nil {
continue
}
return 0, fmt.Errorf("timeout waiting for query result")
payloads = append(payloads, dataMsg.Payloads...)
}
return payloads
}

// IsOnline calls the provider endpoint and returns an error if the API is unreachable
Expand Down
17 changes: 13 additions & 4 deletions pkg/metrics/providers/splunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestNewSplunkProvider(t *testing.T) {

sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs)
require.NoError(t, err)
assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow/execute", sp.metricsQueryEndpoint)
assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow", sp.metricsQueryEndpoint)
assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint)
assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta)
assert.Equal(t, token, sp.token)
Expand All @@ -56,7 +56,10 @@ func TestNewSplunkProvider(t *testing.T) {
func TestSplunkProvider_RunQuery(t *testing.T) {
t.Run("ok", func(t *testing.T) {
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()
go func() {
<-time.After(3 * time.Second)
fakeBackend.Stop()
}()

tsids := []idtool.ID{idtool.ID(rand.Int63())}
var expected float64 = float64(len(tsids))
Expand Down Expand Up @@ -89,7 +92,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) {

t.Run("no values", func(t *testing.T) {
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()
go func() {
<-time.After(3 * time.Second)
fakeBackend.Stop()
}()

tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())}
for _, tsid := range tsids {
Expand Down Expand Up @@ -117,7 +123,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) {

t.Run("multiple values", func(t *testing.T) {
fakeBackend := signalflow.NewRunningFakeBackend()
defer fakeBackend.Stop()
go func() {
<-time.After(3 * time.Second)
fakeBackend.Stop()
}()

tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())}
for i, tsid := range tsids {
Expand Down

0 comments on commit f83e8da

Please sign in to comment.