Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Upgrade google.golang.org/grpc to v1.69.0 #10224

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
golang.org/x/net v0.32.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.8.0
google.golang.org/grpc v1.68.1
google.golang.org/grpc v1.69.2
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -138,7 +138,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.57.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.30.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/mail.v2 v2.3.1 // indirect
gopkg.in/telebot.v3 v3.2.1 // indirect
Expand Down Expand Up @@ -319,7 +318,3 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler
// - https://github.com/grafana/franz-go/pull/3
// - https://github.com/grafana/franz-go/pull/4
replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937

// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
// Following https://github.com/grafana/dskit/pull/581
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0
1,188 changes: 63 additions & 1,125 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,7 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
pushReq := NewParsedRequest(req)
pushReq.AddCleanup(func() {
req.FreeBuffer()
mimirpb.ReuseSlice(req.Timeseries)
})

Expand Down Expand Up @@ -2699,6 +2700,11 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err != nil {
return nil, err
}
defer func() {
for _, resp := range resps {
resp.FreeBuffer()
}
}()

metrics := map[uint64]labels.Labels{}
for _, resp := range resps {
Expand All @@ -2715,6 +2721,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err := queryLimiter.AddSeries(m); err != nil {
return nil, err
}
// Make safe copies of labels.
m.InternStrings(strings.Clone)
result = append(result, m)
}
return result, nil
Expand Down
180 changes: 114 additions & 66 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"context"
"io"
"slices"
"strings"
"time"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -65,6 +66,11 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m
if err != nil {
return err
}
defer func() {
for _, r := range results {
r.FreeBuffer()
}
}()

result = mergeExemplarQueryResponses(results)

Expand Down Expand Up @@ -195,7 +201,18 @@ func mergeExemplarQueryResponses(results []*ingester_client.ExemplarQueryRespons

result := make([]mimirpb.TimeSeries, len(exemplarResults))
for i, k := range keys {
result[i] = exemplarResults[k]
ts := exemplarResults[k]
for i, l := range ts.Labels {
ts.Labels[i].Name = strings.Clone(l.Name)
ts.Labels[i].Value = strings.Clone(l.Value)
}
for i, e := range ts.Exemplars {
for j, l := range e.Labels {
ts.Exemplars[i].Labels[j].Name = strings.Clone(l.Name)
ts.Exemplars[i].Labels[j].Value = strings.Clone(l.Value)
}
}
result[i] = ts
}

return &ingester_client.ExemplarQueryResponse{Timeseries: result}
Expand Down Expand Up @@ -239,92 +256,37 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
log.Span.SetTag("ingester_address", ing.Addr)
log.Span.SetTag("ingester_zone", ing.Zone)

var result ingesterQueryResult

client, err := d.ingesterPool.GetClientForInstance(*ing)
if err != nil {
return ingesterQueryResult{}, err
return result, err
}

stream, err = client.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
return ingesterQueryResult{}, err
return result, err
}

result := ingesterQueryResult{}

// Why retain the batches rather than iteratively build a single slice?
// If we iteratively build a single slice, we'll spend a lot of time copying elements as the slice grows beyond its capacity.
// So instead, we build the slice in one go once we know how many series we have.
var streamingSeriesBatches [][]labels.Labels
streamingSeriesCount := 0

for {
resp, err := stream.Recv()
var err error
var isEOS bool
streamingSeriesCount, streamingSeriesBatches, isEOS, err = receiveResponse(stream, streamingSeriesCount, streamingSeriesBatches, queryLimiter, &result)
if errors.Is(err, io.EOF) {
// We will never get an EOF here from an ingester that is streaming chunks, so we don't need to do anything to set up streaming here.
return result, nil
} else if err != nil {
return ingesterQueryResult{}, err
return result, err
}

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return ingesterQueryResult{}, limitErr
}
}

result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries)
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return ingesterQueryResult{}, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return ingesterQueryResult{}, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
return ingesterQueryResult{}, err
}

result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries)
} else if len(resp.StreamingSeries) > 0 {
labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries))
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
l := mimirpb.FromLabelAdaptersToLabels(s.Labels)

if err := queryLimiter.AddSeries(l); err != nil {
return ingesterQueryResult{}, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
return ingesterQueryResult{}, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
return ingesterQueryResult{}, err
}

labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

if resp.IsEndOfSeriesStream {
if isEOS {
if streamingSeriesCount > 0 {
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)

for _, batch := range streamingSeriesBatches {
result.streamingSeries.Series = append(result.streamingSeries.Series, batch...)
}
Expand Down Expand Up @@ -391,7 +353,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
key := mimirpb.FromLabelAdaptersToKeyString(series.Labels)
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
if existing.Samples == nil {
if len(existing.Samples) == 0 {
existing.Samples = series.Samples
} else {
existing.Samples = mergeSamples(existing.Samples, series.Samples)
Expand Down Expand Up @@ -428,6 +390,92 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
return resp, nil
}

func receiveResponse(stream ingester_client.Ingester_QueryStreamClient, streamingSeriesCount int, streamingSeriesBatches [][]labels.Labels, queryLimiter *limiter.QueryLimiter, result *ingesterQueryResult) (int, [][]labels.Labels, bool, error) {
resp, err := stream.Recv()
if err != nil {
return 0, nil, false, err
}
defer resp.FreeBuffer()

if len(resp.Timeseries) > 0 {
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); limitErr != nil {
return 0, nil, false, limitErr
}
}

for i, ts := range resp.Timeseries {
for j, l := range ts.Labels {
resp.Timeseries[i].Labels[j].Name = strings.Clone(l.Name)
resp.Timeseries[i].Labels[j].Value = strings.Clone(l.Value)
}
for j, e := range ts.Exemplars {
for k, l := range e.Labels {
resp.Timeseries[i].Exemplars[j].Labels[k].Name = strings.Clone(l.Name)
resp.Timeseries[i].Exemplars[j].Labels[k].Value = strings.Clone(l.Value)
}
}
}
result.timeseriesBatches = append(result.timeseriesBatches, resp.Timeseries)
} else if len(resp.Chunkseries) > 0 {
// Enforce the max chunks limits.
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

for _, series := range resp.Chunkseries {
if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(series.Labels)); err != nil {
return 0, nil, false, err
}
}

if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
return 0, nil, false, err
}

for i, s := range resp.Chunkseries {
for j, l := range s.Labels {
resp.Chunkseries[i].Labels[j].Name = strings.Clone(l.Name)
resp.Chunkseries[i].Labels[j].Value = strings.Clone(l.Value)
}
for j, c := range s.Chunks {
resp.Chunkseries[i].Chunks[j].Data = slices.Clone(c.Data)
}
}
result.chunkseriesBatches = append(result.chunkseriesBatches, resp.Chunkseries)
} else if len(resp.StreamingSeries) > 0 {
labelsBatch := make([]labels.Labels, 0, len(resp.StreamingSeries))
streamingSeriesCount += len(resp.StreamingSeries)

for _, s := range resp.StreamingSeries {
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if err := queryLimiter.AddSeries(l); err != nil {
return 0, nil, false, err
}

// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
return 0, nil, false, err
}

if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
return 0, nil, false, err
}

labelsBatch = append(labelsBatch, l)
}

streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
}

return streamingSeriesCount, streamingSeriesBatches, resp.IsEndOfSeriesStream, nil
}

// estimatedIngestersPerSeries estimates the number of ingesters that will have chunks for each streaming series.
func (d *Distributor) estimatedIngestersPerSeries(replicationSets []ring.ReplicationSet) int {
if d.cfg.IngestStorageConfig.Enabled {
Expand Down
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/model.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pkg/frontend/querymiddleware/model.pb.go.expdiff
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
diff --git a/pkg/frontend/querymiddleware/model.pb.go b/pkg/frontend/querymiddleware/model.pb.go
index 315ed4eed..47f80838c 100644
--- a/pkg/frontend/querymiddleware/model.pb.go
+++ b/pkg/frontend/querymiddleware/model.pb.go
@@ -83,9 +83,6 @@ func (m *PrometheusHeader) GetValues() []string {
}

type PrometheusResponse struct {
- // Keep reference to buffer for unsafe references.
- github_com_grafana_mimir_pkg_mimirpb.BufferHolder
-
Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"`
Data *PrometheusData `protobuf:"bytes,2,opt,name=Data,proto3" json:"data,omitempty"`
ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"`
23 changes: 20 additions & 3 deletions pkg/ingester/client/buffering_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -68,7 +70,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
require.Equal(t, requestsToSend, reqs)
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)
})

t.Run("push with pooling", func(t *testing.T) {
Expand All @@ -85,7 +92,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
}

reqs := serv.requests()
require.Equal(t, requestsToSend, reqs)
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)

// Verify that pool was used.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down Expand Up @@ -149,7 +161,12 @@ func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T)
_, err := bufferingClient.Push(ctx, req)
require.NoError(t, err)

require.Equal(t, serv.requests(), []*mimirpb.WriteRequest{req})
diff := cmp.Diff([]*mimirpb.WriteRequest{req}, serv.requests(), cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
return a.TimeSeries.Equal(b.TimeSeries)
}))
}))
require.Empty(t, diff)

// Verify that all buffers from the pool were returned.
require.Greater(t, pool.Gets.Load(), int64(0))
Expand Down
Loading
Loading