From f83e8dac3377b656f1e072d7ba20a68f4dee94d1 Mon Sep 17 00:00:00 2001 From: kane8n Date: Thu, 28 Nov 2024 19:33:21 +0900 Subject: [PATCH] bug fix Signed-off-by: kane8n --- pkg/metrics/providers/splunk.go | 69 +++++++++++++++++----------- pkg/metrics/providers/splunk_test.go | 17 +++++-- 2 files changed, 56 insertions(+), 30 deletions(-) diff --git a/pkg/metrics/providers/splunk.go b/pkg/metrics/providers/splunk.go index 4afbf46b6..44540ad1b 100644 --- a/pkg/metrics/providers/splunk.go +++ b/pkg/metrics/providers/splunk.go @@ -34,8 +34,8 @@ import ( // https://docs.datadoghq.com/api/ const ( - signalFxMTSQueryPath = "/v2/signalflow/execute" - signalFxValidationPath = "/v2/metric?limit=1" + signalFxSignalFlowApiPath = "/v2/signalflow" + signalFxValidationPath = "/v2/metric?limit=1" signalFxTokenSecretKey = "sf_token_key" @@ -70,7 +70,7 @@ func NewSplunkProvider(metricInterval string, sp := SplunkProvider{ timeout: 5 * time.Second, - metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxMTSQueryPath, "http", "ws", 1), "api", "stream", 1), + metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1), apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1), } @@ -102,37 +102,54 @@ func (p *SplunkProvider) RunQuery(query string) (float64, error) { now := time.Now().UnixMilli() comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{ Program: query, - Start: time.Unix(0, (now-p.fromDelta)*time.Millisecond.Nanoseconds()), - Stop: time.Unix(0, now*time.Millisecond.Nanoseconds()), + Start: time.UnixMilli(now - p.fromDelta), + Stop: time.UnixMilli(now), Immediate: true, }) if err != nil { return 0, fmt.Errorf("error executing query: %w", err) } - select { - case dataMsg := <-comp.Data(): - payloads := slices.DeleteFunc(dataMsg.Payloads, func(msg messages.DataPayload) bool { - return msg.Value() == nil - }) - if len(payloads) < 1 { - return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) - } - _payloads := slices.Clone(payloads) - slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { - return cmp.Compare(i.TSID, j.TSID) - }) - if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { - return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) - } - return payloads[len(payloads)-1].Value().(float64), nil - case <-time.After(p.timeout): - err := comp.Stop(ctx) - if err != nil { - return 0, fmt.Errorf("error stopping query: %w", err) + payloads := p.receivePaylods(comp) + + if comp.Err() != nil { + return 0, fmt.Errorf("error executing query: %w", comp.Err()) + } + payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool { + return msg.Value() == nil + }) + if len(payloads) < 1 { + return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound) + } + _payloads := slices.Clone(payloads) + slices.SortFunc(_payloads, func(i, j messages.DataPayload) int { + return cmp.Compare(i.TSID, j.TSID) + }) + if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 { + return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned) + } + payload := payloads[len(payloads)-1] + switch payload.Type { + case messages.ValTypeLong: + return float64(payload.Int64()), nil + case messages.ValTypeDouble: + return payload.Float64(), nil + case messages.ValTypeInt: + return float64(payload.Int32()), nil + default: + return 0, fmt.Errorf("invalid response: UnsupportedValueType") + } +} + +func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload { + payloads := []messages.DataPayload{} + for dataMsg := range comp.Data() { + if dataMsg == nil { + continue } - return 0, fmt.Errorf("timeout waiting for query result") + payloads = append(payloads, dataMsg.Payloads...) } + return payloads } // IsOnline calls the provider endpoint and returns an error if the API is unreachable diff --git a/pkg/metrics/providers/splunk_test.go b/pkg/metrics/providers/splunk_test.go index b42459262..bd0302718 100644 --- a/pkg/metrics/providers/splunk_test.go +++ b/pkg/metrics/providers/splunk_test.go @@ -47,7 +47,7 @@ func TestNewSplunkProvider(t *testing.T) { sp, err := NewSplunkProvider("100s", flaggerv1.MetricTemplateProvider{Address: "https://api.us1.signalfx.com"}, cs) require.NoError(t, err) - assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow/execute", sp.metricsQueryEndpoint) + assert.Equal(t, "wss://stream.us1.signalfx.com/v2/signalflow", sp.metricsQueryEndpoint) assert.Equal(t, "https://api.us1.signalfx.com/v2/metric?limit=1", sp.apiValidationEndpoint) assert.Equal(t, int64(md.Milliseconds()*signalFxFromDeltaMultiplierOnMetricInterval), sp.fromDelta) assert.Equal(t, token, sp.token) @@ -56,7 +56,10 @@ func TestNewSplunkProvider(t *testing.T) { func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("ok", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63())} var expected float64 = float64(len(tsids)) @@ -89,7 +92,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("no values", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} for _, tsid := range tsids { @@ -117,7 +123,10 @@ func TestSplunkProvider_RunQuery(t *testing.T) { t.Run("multiple values", func(t *testing.T) { fakeBackend := signalflow.NewRunningFakeBackend() - defer fakeBackend.Stop() + go func() { + <-time.After(3 * time.Second) + fakeBackend.Stop() + }() tsids := []idtool.ID{idtool.ID(rand.Int63()), idtool.ID(rand.Int63()), idtool.ID(rand.Int63())} for i, tsid := range tsids {