From c37f0ed40fba4980a4c87dea3f7d56a5a6bf3087 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 5 Sep 2023 17:45:06 +0200 Subject: [PATCH] Store: always sort, just compare labelset in proxy heap Signed-off-by: Michael Hoffmann --- pkg/store/bucket.go | 2 +- pkg/store/flushable.go | 8 +------- pkg/store/prometheus.go | 2 +- pkg/store/proxy_heap.go | 18 +----------------- pkg/store/proxy_heap_test.go | 31 +++++++++++++++++++++++++++++++ pkg/store/tsdb.go | 2 +- 6 files changed, 36 insertions(+), 27 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 027122447ef..f093f0cc61a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1254,7 +1254,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index c41b67d152c..c10fdf51a14 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -9,7 +9,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. @@ -20,13 +19,8 @@ type flushableServer interface { func newFlushableServer( upstream storepb.Store_SeriesServer, - labelNames stringset.Set, - replicaLabels []string, ) flushableServer { - if labelNames.HasAny(replicaLabels) { - return &resortingServer{Store_SeriesServer: upstream} - } - return &passthroughServer{Store_SeriesServer: upstream} + return &resortingServer{Store_SeriesServer: upstream} } // passthroughServer is a flushableServer that forwards all data to diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 244ae5592d5..4c69df5d4fd 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) + s := newFlushableServer(seriesSrv) extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ea18b134d9..a10d3d489b3 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -174,26 +174,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - // Response sets are sorted before adding external labels. - // This comparison excludes those labels to keep the same order. - iStoreLbls := h.nodes[i].rs.StoreLabels() - jStoreLbls := h.nodes[j].rs.StoreLabels() - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - copyLabels(&h.iLblsScratch, iLbls) - copyLabels(&h.jLblsScratch, jLbls) - - var iExtLbls, jExtLbls labels.Labels - h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) - h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) - - c := labels.Compare(h.iLblsScratch, h.jLblsScratch) - if c != 0 { - return c < 0 - } - return labels.Compare(iExtLbls, jExtLbls) < 0 + return labels.Compare(iLbls, jLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index fdfec178ca5..5e2d32dc531 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -190,6 +190,37 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), }, }, + { + title: "test", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + }, } { t.Run(tcase.title, func(t *testing.T) { h := NewProxyResponseHeap(tcase.input...) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73604b92365..f1bd44040d5 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil {