From 086a698b2195adb6f3463ebbd032e780f39d2050 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Tue, 28 May 2024 14:19:33 +0100 Subject: [PATCH] Cut patch release `v0.35.1` (#7394) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * compact: recover from panics (#7318) For https://github.com/thanos-io/thanos/issues/6775, it would be useful to know the exact block IDs to aid debugging. Signed-off-by: Giedrius Statkevičius * Sidecar: wait for prometheus on startup (#7323) Signed-off-by: Michael Hoffmann * Receive: fix serverAsClient.Series goroutines leak (#6948) * fix serverAsClient goroutines leak Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * fix lint Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * update changelog Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * delete invalid comment Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * remove temp dev test Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * remove timer channel drain Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> --------- Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * Receive: fix stats (#7373) If we account stats for remote write and local writes we will count them twice since the remote write will be counted locally again by the remote receiver instance. Signed-off-by: Michael Hoffmann * *: Ensure objstore flag values are masked & disable debug/pprof/cmdline (#7382) * *: Ensure objstore flag values are masked & disable debug/pprof/cmdline Signed-off-by: Saswata Mukherjee * small fix Signed-off-by: Saswata Mukherjee --------- Signed-off-by: Saswata Mukherjee * Query: dont pass query hints to avoid triggering pushdown (#7392) If we have a new querier it will create query hints even without the pushdown feature being present anymore. Old sidecars will then trigger query pushdown which leads to broken max,min,max_over_time and min_over_time. Signed-off-by: Michael Hoffmann * Cut patch release v0.35.1 Signed-off-by: Saswata Mukherjee --------- Signed-off-by: Giedrius Statkevičius Signed-off-by: Michael Hoffmann Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> Signed-off-by: Saswata Mukherjee Co-authored-by: Giedrius Statkevičius Co-authored-by: Michael Hoffmann Co-authored-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> --- CHANGELOG.md | 17 ++++ VERSION | 2 +- cmd/thanos/main.go | 5 ++ cmd/thanos/sidecar.go | 99 ++++++++++++++--------- pkg/compact/compact.go | 16 ++++ pkg/query/querier.go | 15 ---- pkg/receive/handler.go | 35 ++++----- pkg/server/http/http.go | 1 - pkg/store/bucket.go | 2 - pkg/store/prometheus.go | 14 +--- pkg/store/proxy_merge.go | 139 +++++++++++++-------------------- pkg/store/storepb/inprocess.go | 10 +-- 12 files changed, 178 insertions(+), 177 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dcc28f6a0c..29b6f2ae18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,23 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Removed +## [v0.35.1](https://github.com/thanos-io/thanos/tree/release-0.35) - 28.05.2024 + +### Fixed + +- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup +- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api. +- [#7382](https://github.com/thanos-io/thanos/pull/7382) *: Ensure objstore flag values are masked & disable debug/pprof/cmdline +- [#7392](https://github.com/thanos-io/thanos/pull/7392) Query: fix broken min, max for pre 0.34.1 sidecars +- [#7373](https://github.com/thanos-io/thanos/pull/7373) Receive: Fix stats for remote write +- [#7318](https://github.com/thanos-io/thanos/pull/7318) Compactor: Recover from panic to log block ID + +### Added + +### Changed + +### Removed + ## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024 ### Fixed diff --git a/VERSION b/VERSION index 7b52f5e517..731b95d7fc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.35.0 +0.35.1 diff --git a/cmd/thanos/main.go b/cmd/thanos/main.go index 892c9824e1..a29d702d0d 100644 --- a/cmd/thanos/main.go +++ b/cmd/thanos/main.go @@ -214,6 +214,11 @@ func getFlagsMap(flags []*kingpin.FlagModel) map[string]string { if boilerplateFlags.GetFlag(f.Name) != nil { continue } + // Mask inline objstore flag which can have credentials. + if f.Name == "objstore.config" || f.Name == "objstore.config-file" { + flagsMap[f.Name] = "" + continue + } flagsMap[f.Name] = f.Value.String() } diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 9b8c2feded..95ad4ba693 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -172,64 +172,87 @@ func runSidecar( Help: "Boolean indicator whether the sidecar can reach its Prometheus peer.", }) - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - // Only check Prometheus's flags when upload is enabled. - if uploads { - // Check prometheus's flags to ensure same sidecar flags. - if err := validatePrometheus(ctx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { - return errors.Wrap(err, "validate Prometheus flags") - } - } + ctx := context.Background() + // Only check Prometheus's flags when upload is enabled. + if uploads { + // Check prometheus's flags to ensure same sidecar flags. + // We retry infinitely until we validated prometheus flags + err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) + defer iterCancel() - // We retry infinitely until we reach and fetch BuildVersion from our Prometheus. - err := runutil.Retry(2*time.Second, ctx.Done(), func() error { - if err := m.BuildVersion(ctx); err != nil { + if err := validatePrometheus(iterCtx, m.client, logger, conf.shipper.ignoreBlockSize, m); err != nil { level.Warn(logger).Log( - "msg", "failed to fetch prometheus version. Is Prometheus running? Retrying", + "msg", "failed to validate prometheus flags. Is Prometheus running? Retrying", "err", err, ) return err } level.Info(logger).Log( - "msg", "successfully loaded prometheus version", + "msg", "successfully validated prometheus flags", ) return nil }) if err != nil { - return errors.Wrap(err, "failed to get prometheus version") + return errors.Wrap(err, "failed to validate prometheus flags") } + } - // Blocking query of external labels before joining as a Source Peer into gossip. - // We retry infinitely until we reach and fetch labels from our Prometheus. - err = runutil.Retry(2*time.Second, ctx.Done(), func() error { - if err := m.UpdateLabels(ctx); err != nil { - level.Warn(logger).Log( - "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", - "err", err, - ) - promUp.Set(0) - statusProber.NotReady(err) - return err - } + // We retry infinitely until we reach and fetch BuildVersion from our Prometheus. + err := runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) + defer iterCancel() - level.Info(logger).Log( - "msg", "successfully loaded prometheus external labels", - "external_labels", m.Labels().String(), + if err := m.BuildVersion(iterCtx); err != nil { + level.Warn(logger).Log( + "msg", "failed to fetch prometheus version. Is Prometheus running? Retrying", + "err", err, ) - promUp.Set(1) - statusProber.Ready() - return nil - }) - if err != nil { - return errors.Wrap(err, "initial external labels query") + return err } - if len(m.Labels()) == 0 { - return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") + level.Info(logger).Log( + "msg", "successfully loaded prometheus version", + ) + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to get prometheus version") + } + + // Blocking query of external labels before joining as a Source Peer into gossip. + // We retry infinitely until we reach and fetch labels from our Prometheus. + err = runutil.Retry(conf.prometheus.getConfigInterval, ctx.Done(), func() error { + iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) + defer iterCancel() + + if err := m.UpdateLabels(iterCtx); err != nil { + level.Warn(logger).Log( + "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", + "err", err, + ) + return err } + level.Info(logger).Log( + "msg", "successfully loaded prometheus external labels", + "external_labels", m.Labels().String(), + ) + return nil + }) + if err != nil { + return errors.Wrap(err, "initial external labels query") + } + + if len(m.Labels()) == 0 { + return errors.New("no external labels configured on Prometheus server, uniquely identifying external labels must be configured; see https://thanos.io/tip/thanos/storage.md#external-labels for details.") + } + promUp.Set(1) + statusProber.Ready() + + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { // Periodically query the Prometheus config. We use this as a heartbeat as well as for updating // the external labels we apply. return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 2f0dbfdb27..7232119b0b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" @@ -871,6 +872,21 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } + defer func() { + if p := recover(); p != nil { + var sb strings.Builder + + cgIDs := cg.IDs() + for i, blid := range cgIDs { + _, _ = sb.WriteString(blid.String()) + if i < len(cgIDs)-1 { + _, _ = sb.WriteString(",") + } + } + rerr = fmt.Errorf("paniced while compacting %s: %v", sb.String(), p) + } + }() + errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) diff --git a/pkg/query/querier.go b/pkg/query/querier.go index c26bd025ce..9a1a311097 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -241,20 +241,6 @@ func aggrsFromFunc(f string) []storepb.Aggr { return []storepb.Aggr{storepb.Aggr_COUNT, storepb.Aggr_SUM} } -func storeHintsFromPromHints(hints *storage.SelectHints) *storepb.QueryHints { - return &storepb.QueryHints{ - StepMillis: hints.Step, - Func: &storepb.Func{ - Name: hints.Func, - }, - Grouping: &storepb.Grouping{ - By: hints.By, - Labels: hints.Grouping, - }, - Range: &storepb.Range{Millis: hints.Range}, - } -} - func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { if hints == nil { hints = &storage.SelectHints{ @@ -351,7 +337,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . ShardInfo: q.shardInfo, PartialResponseStrategy: q.partialResponseStrategy, SkipChunks: q.skipChunks, - QueryHints: storeHintsFromPromHints(hints), } if q.isDedupEnabled() { // Soft ask to sort without replica labels and push them at the end of labelset. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 208c01b131..d6c2596f36 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -681,35 +681,32 @@ type remoteWriteParams struct { alreadyReplicated bool } -func (h *Handler) gatherWriteStats(writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats { +func (h *Handler) gatherWriteStats(localWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats { var stats tenantRequestStats = make(tenantRequestStats) - for _, write := range writes { - for er := range write { - for tenant, series := range write[er] { - samples := 0 + for er := range localWrites { + for tenant, series := range localWrites[er] { + samples := 0 - for _, ts := range series.timeSeries { - samples += len(ts.Samples) - } + for _, ts := range series.timeSeries { + samples += len(ts.Samples) + } - if st, ok := stats[tenant]; ok { - st.timeseries += len(series.timeSeries) - st.totalSamples += samples + if st, ok := stats[tenant]; ok { + st.timeseries += len(series.timeSeries) + st.totalSamples += samples - stats[tenant] = st - } else { - stats[tenant] = requestStats{ - timeseries: len(series.timeSeries), - totalSamples: samples, - } + stats[tenant] = st + } else { + stats[tenant] = requestStats{ + timeseries: len(series.timeSeries), + totalSamples: samples, } } } } return stats - } func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (tenantRequestStats, error) { @@ -739,7 +736,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( return stats, err } - stats = h.gatherWriteStats(localWrites, remoteWrites) + stats = h.gatherWriteStats(localWrites) // Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go // asynchronously and with this capacity we will never block on writing to the channel. diff --git a/pkg/server/http/http.go b/pkg/server/http/http.go index 795d6c4fdb..fc92100e3f 100644 --- a/pkg/server/http/http.go +++ b/pkg/server/http/http.go @@ -117,7 +117,6 @@ func (s *Server) Handle(pattern string, handler http.Handler) { func registerProfiler(mux *http.ServeMux) { mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", pprof.Profile) mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 43ac0d6c1a..d5e90a209a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1571,7 +1571,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store var resp respSet if s.sortingStrategy == sortingStrategyStore { resp = newEagerRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), @@ -1585,7 +1584,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store ) } else { resp = newLazyRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d52fcb07d9..721e9ed51e 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -163,19 +163,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto // Don't ask for more than available time. This includes potential `minTime` flag limit. availableMinTime, _ := p.timestamps() if r.MinTime < availableMinTime { - // Align min time with the step to avoid missing data when it gets retrieved by the upper layer's PromQL engine. - // This also is necessary when Sidecar uploads a block and then availableMinTime - // becomes a fixed timestamp. - if r.QueryHints != nil && r.QueryHints.StepMillis != 0 { - diff := availableMinTime - r.MinTime - r.MinTime += (diff / r.QueryHints.StepMillis) * r.QueryHints.StepMillis - // Add one more to strictly fit within --min-time -> infinity. - if r.MinTime != availableMinTime { - r.MinTime += r.QueryHints.StepMillis - } - } else { - r.MinTime = availableMinTime - } + r.MinTime = availableMinTime } extLsetToRemove := map[string]struct{}{} diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 0235958b4f..62469b7959 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -211,13 +211,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} { type lazyRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels storeLabels map[string]struct{} frameTimeout time.Duration - ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -294,7 +292,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse { } func newLazyRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -311,12 +308,10 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, - cl: cl, storeName: storeName, storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, - ctx: ctx, dataOrFinishEvent: dataAvailable, bufferedResponsesMtx: bufferedResponsesMtx, bufferedResponses: bufferedResponses, @@ -353,19 +348,9 @@ func newLazyRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) - l.span.SetTag("err", err.Error()) + resp, err := cl.Recv() - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - default: - resp, err := cl.Recv() + if err != nil { if err == io.EOF { l.bufferedResponsesMtx.Lock() l.noMoreData = true @@ -374,45 +359,43 @@ func newLazyRespSet( return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) - } else { - rerr = errors.Wrapf(err, "receive series from %s", st) } - - l.span.SetTag("err", rerr.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - } - - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true + } else { + rerr = errors.Wrapf(err, "receive series from %s", st) } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + l.span.SetTag("err", rerr.Error()) l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, resp) + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.noMoreData = true l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() + return false + } + + numResponses++ + bytesProcessed += resp.Size() + + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponsesMtx.Lock() + l.bufferedResponses = append(l.bufferedResponses, resp) + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return true } var t *time.Timer @@ -509,7 +492,6 @@ func newAsyncRespSet( switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -522,7 +504,6 @@ func newAsyncRespSet( ), nil case EagerRetrieval: return newEagerRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -556,8 +537,6 @@ func (l *lazyRespSet) Close() { type eagerRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient - ctx context.Context closeSeries context.CancelFunc frameTimeout time.Duration @@ -576,7 +555,6 @@ type eagerRespSet struct { } func newEagerRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -591,9 +569,7 @@ func newEagerRespSet( ret := &eagerRespSet{ span: span, closeSeries: closeSeries, - cl: cl, frameTimeout: frameTimeout, - ctx: ctx, bufferedResponses: []*storepb.SeriesResponse{}, wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, @@ -638,48 +614,45 @@ func newEagerRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.span.SetTag("err", err.Error()) - return false - default: - resp, err := cl.Recv() + resp, err := cl.Recv() + + if err != nil { if err == io.EOF { return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + <-t.C // Drain the channel if it was already stopped. + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) - } else { - rerr = errors.Wrapf(err, "receive series from %s", storeName) } - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.span.SetTag("err", rerr.Error()) - return false + } else { + rerr = errors.Wrapf(err, "receive series from %s", storeName) } - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true - } + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.span.SetTag("err", rerr.Error()) + return false + } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + numResponses++ + bytesProcessed += resp.Size() - l.bufferedResponses = append(l.bufferedResponses, resp) + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponses = append(l.bufferedResponses, resp) + return true } + var t *time.Timer if frameTimeout > 0 { t = time.AfterFunc(frameTimeout, closeSeries) diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 24b90f3885..e09210d442 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -36,7 +36,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)} inSrv.ctx, inSrv.cancel = context.WithCancel(ctx) go func() { - inSrv.err <- s.srv.Series(in, inSrv) + if err := s.srv.Series(in, inSrv); err != nil { + inSrv.err <- err + } close(inSrv.err) close(inSrv.recv) }() @@ -88,15 +90,13 @@ func (s *inProcessClientStream) CloseSend() error { func (s *inProcessClientStream) Recv() (*SeriesResponse, error) { select { - case <-s.srv.ctx.Done(): - return nil, s.srv.ctx.Err() case r, ok := <-s.srv.recv: if !ok { return nil, io.EOF } return r, nil - case err := <-s.srv.err: - if err == nil { + case err, ok := <-s.srv.err: + if !ok { return nil, io.EOF } return nil, err