From 882e41790ae5fc8dc1ab0edaba3cbf63cfc373f9 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Tue, 5 Sep 2023 17:45:06 +0200 Subject: [PATCH] Store: always sort, just compare labelset in proxy heap Signed-off-by: Michael Hoffmann --- pkg/store/bucket.go | 2 +- pkg/store/flushable.go | 16 +--------- pkg/store/prometheus.go | 2 +- pkg/store/proxy_heap.go | 50 ++----------------------------- pkg/store/proxy_heap_test.go | 58 +++++++++++++++++++----------------- pkg/store/tsdb.go | 2 +- 6 files changed, 37 insertions(+), 93 deletions(-) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 027122447ef..f093f0cc61a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1254,7 +1254,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { err = s.queryGate.Start(srv.Context()) diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index c41b67d152c..60722d4b0a3 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -9,7 +9,6 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/stringset" ) // flushableServer is an extension of storepb.Store_SeriesServer with a Flush method. @@ -20,23 +19,10 @@ type flushableServer interface { func newFlushableServer( upstream storepb.Store_SeriesServer, - labelNames stringset.Set, - replicaLabels []string, ) flushableServer { - if labelNames.HasAny(replicaLabels) { - return &resortingServer{Store_SeriesServer: upstream} - } - return &passthroughServer{Store_SeriesServer: upstream} + return &resortingServer{Store_SeriesServer: upstream} } -// passthroughServer is a flushableServer that forwards all data to -// an upstream server without additional processing. -type passthroughServer struct { - storepb.Store_SeriesServer -} - -func (p *passthroughServer) Flush() error { return nil } - // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. // Data is resorted and sent to an upstream server upon calling Flush. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 244ae5592d5..4c69df5d4fd 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels) + s := newFlushableServer(seriesSrv) extLset := p.externalLabelsFn() match, matchers, err := matchesExternalLabels(r.Matchers, extLset) diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 7ea18b134d9..757a9fa0d33 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -164,9 +164,7 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse { // tournament trees need n-1 auxiliary nodes so there // might not be much of a difference. type ProxyResponseHeap struct { - nodes []ProxyResponseHeapNode - iLblsScratch labels.Labels - jLblsScratch labels.Labels + nodes []ProxyResponseHeapNode } func (h *ProxyResponseHeap) Less(i, j int) bool { @@ -174,26 +172,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool { jResp := h.nodes[j].rs.At() if iResp.GetSeries() != nil && jResp.GetSeries() != nil { - // Response sets are sorted before adding external labels. - // This comparison excludes those labels to keep the same order. - iStoreLbls := h.nodes[i].rs.StoreLabels() - jStoreLbls := h.nodes[j].rs.StoreLabels() - iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) - copyLabels(&h.iLblsScratch, iLbls) - copyLabels(&h.jLblsScratch, jLbls) - - var iExtLbls, jExtLbls labels.Labels - h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls) - h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls) - - c := labels.Compare(h.iLblsScratch, h.jLblsScratch) - if c != 0 { - return c < 0 - } - return labels.Compare(iExtLbls, jExtLbls) < 0 + return labels.Compare(iLbls, jLbls) < 0 } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { return true } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { @@ -794,34 +776,6 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels return l } -// dropLabels removes labels from the given label set and returns the removed labels. -func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) { - cutoff := len(l) - for i := 0; i < len(l); i++ { - if i == cutoff { - break - } - if _, ok := labelsToDrop[l[i].Name]; !ok { - continue - } - - lbl := l[i] - l = append(append(l[:i], l[i+1:]...), lbl) - cutoff-- - i-- - } - - return l[:cutoff], l[cutoff:] -} - -func copyLabels(dest *labels.Labels, src labels.Labels) { - if len(*dest) < cap(src) { - *dest = make([]labels.Label, len(src)) - } - *dest = (*dest)[:len(src)] - copy(*dest, src) -} - // sortWithoutLabels removes given labels from series and re-sorts the series responses that the same // series with different labels are coming right after each other. Other types of responses are moved to front. func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index fdfec178ca5..50fe2d46beb 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -82,33 +82,6 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")), }, }, - { - title: "merge duplicated sets that were ordered before adding external labels", - input: []respSet{ - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - &eagerRespSet{ - wg: &sync.WaitGroup{}, - bufferedResponses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - storeLabels: map[string]struct{}{"c": {}}, - }, - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), - }, - }, { title: "merge repeated series in stores with different external labels", input: []respSet{ @@ -190,6 +163,37 @@ func TestProxyResponseHeapSort(t *testing.T) { storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), }, }, + { + title: "test", + input: []respSet{ + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}}, + }, + &eagerRespSet{ + wg: &sync.WaitGroup{}, + bufferedResponses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}}, + }, + }, + exp: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), + }, + }, } { t.Run(tcase.title, func(t *testing.T) { h := NewProxyResponseHeap(tcase.input...) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 73604b92365..f1bd44040d5 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -175,7 +175,7 @@ type CloseDelegator interface { // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels) + srv := newFlushableServer(seriesSrv) match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) if err != nil {