Skip to content

Commit

Permalink
frontend: active series experimental custom decoder bugfix (#7965)
Browse files Browse the repository at this point in the history
* frontend: active series experimental custom decoder bugfix

ref: https://raintank-corp.slack.com/archives/C058HL8A1M5/p1714010102452949

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

* extra unit test

ref: #7965 (comment)

Signed-off-by: Miguel Ángel Ortuño <[email protected]>

---------

Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman authored Apr 29, 2024
1 parent f469726 commit deb40d8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
)

const (
activeSeriesChunkMaxBufferSize = 1024 * 1024 // 1MB
defaultActiveSeriesChunkMaxBufferSize = 1024 * 1024 // 1MB

checkContextCancelledBytesInterval = 256
)

var activeSeriesChunkBufferPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, activeSeriesChunkMaxBufferSize))
return bytes.NewBuffer(make([]byte, 0, defaultActiveSeriesChunkMaxBufferSize))
},
}

Expand Down Expand Up @@ -56,13 +56,14 @@ func reuseActiveSeriesDataStreamBuffer(buf *bytes.Buffer) {
}

type shardActiveSeriesResponseDecoder struct {
ctx context.Context
rc io.ReadCloser
br *bufio.Reader
strBuff []byte
streamCh chan<- *bytes.Buffer
readBytesCount int
err error
ctx context.Context
rc io.ReadCloser
br *bufio.Reader
strBuff []byte
streamCh chan<- *bytes.Buffer
readBytesCount int
err error
chunkBufferMaxSize int
}

func (d *shardActiveSeriesResponseDecoder) reset(ctx context.Context, rc io.ReadCloser, streamCh chan<- *bytes.Buffer) {
Expand All @@ -74,6 +75,7 @@ func (d *shardActiveSeriesResponseDecoder) reset(ctx context.Context, rc io.Read
d.strBuff = d.strBuff[:0]
d.readBytesCount = 0
d.err = nil
d.chunkBufferMaxSize = defaultActiveSeriesChunkMaxBufferSize
}

func (d *shardActiveSeriesResponseDecoder) stickError(err error) {
Expand Down Expand Up @@ -199,9 +201,10 @@ func (d *shardActiveSeriesResponseDecoder) streamData() error {
return d.err
}

if cb.Len() >= activeSeriesChunkMaxBufferSize {
if cb.Len() >= d.chunkBufferMaxSize {
d.streamCh <- cb
cb = activeSeriesChunkBufferPool.Get().(*bytes.Buffer)
firstItem = true
}
}
d.checkContextCanceled()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package querymiddleware
import (
"bytes"
"context"
"fmt"
"io"
"strings"
"testing"
Expand All @@ -15,10 +14,11 @@ import (

func TestShardActiveSeriesResponseDecoder(t *testing.T) {
tcs := []struct {
name string
input string
expectedOutput string
expectedError string
name string
input string
expectedOutput string
expectedError string
chunkBufferMaxSize int
}{
{
name: "empty response",
Expand All @@ -40,6 +40,12 @@ func TestShardActiveSeriesResponseDecoder(t *testing.T) {
input: `{"data":[{"__name__":"metric","shard":"1"},{"__name__":"metric","shard":"2"}]}`,
expectedOutput: `{"__name__":"metric","shard":"1"},{"__name__":"metric","shard":"2"}`,
},
{
name: "reach max buffer size",
input: `{"data":[{"__name__":"metric","shard":"1"},{"__name__":"metric","shard":"2"},{"__name__":"metric","shard":"3"}]}`,
expectedOutput: `{"__name__":"metric","shard":"1"},{"__name__":"metric","shard":"2"},{"__name__":"metric","shard":"3"}`,
chunkBufferMaxSize: 16,
},
{
name: "unexpected comma",
input: `{"data":[{"__name__":"metric","shard":"1"},,,{"__name__":"metric","shard":"2"}`,
Expand Down Expand Up @@ -81,6 +87,9 @@ func TestShardActiveSeriesResponseDecoder(t *testing.T) {

r := strings.NewReader(tc.input)
d := borrowShardActiveSeriesResponseDecoder(context.Background(), io.NopCloser(r), streamCh)
if tc.chunkBufferMaxSize > 0 {
d.chunkBufferMaxSize = tc.chunkBufferMaxSize
}

err := d.decode()
if err == nil {
Expand All @@ -90,8 +99,13 @@ func TestShardActiveSeriesResponseDecoder(t *testing.T) {
}()

// Drain the data channel.
firstItem := true
for streamBuf := range streamCh {
fmt.Println(streamBuf.String())
if !firstItem {
dataStr.WriteString(",")
} else {
firstItem = false
}
dataStr.WriteString(streamBuf.String())
}
} else {
Expand Down

0 comments on commit deb40d8

Please sign in to comment.