Skip to content

Commit

Permalink
Store: always sort, just compare labelset in proxy heap
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
mhoffm-aiven authored and MichaHoffmann committed Sep 6, 2023
1 parent d1edf74 commit c37f0ed
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels)
srv := newFlushableServer(seriesSrv)
if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down
8 changes: 1 addition & 7 deletions pkg/store/flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
)

// flushableServer is an extension of storepb.Store_SeriesServer with a Flush method.
Expand All @@ -20,13 +19,8 @@ type flushableServer interface {

func newFlushableServer(
upstream storepb.Store_SeriesServer,
labelNames stringset.Set,
replicaLabels []string,
) flushableServer {
if labelNames.HasAny(replicaLabels) {
return &resortingServer{Store_SeriesServer: upstream}
}
return &passthroughServer{Store_SeriesServer: upstream}
return &resortingServer{Store_SeriesServer: upstream}
}

// passthroughServer is a flushableServer that forwards all data to
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels)
s := newFlushableServer(seriesSrv)
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
Expand Down
18 changes: 1 addition & 17 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,26 +174,10 @@ func (h *ProxyResponseHeap) Less(i, j int) bool {
jResp := h.nodes[j].rs.At()

if iResp.GetSeries() != nil && jResp.GetSeries() != nil {
// Response sets are sorted before adding external labels.
// This comparison excludes those labels to keep the same order.
iStoreLbls := h.nodes[i].rs.StoreLabels()
jStoreLbls := h.nodes[j].rs.StoreLabels()

iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels)
jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels)

copyLabels(&h.iLblsScratch, iLbls)
copyLabels(&h.jLblsScratch, jLbls)

var iExtLbls, jExtLbls labels.Labels
h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls)
h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls)

c := labels.Compare(h.iLblsScratch, h.jLblsScratch)
if c != 0 {
return c < 0
}
return labels.Compare(iExtLbls, jExtLbls) < 0
return labels.Compare(iLbls, jLbls) < 0
} else if iResp.GetSeries() == nil && jResp.GetSeries() != nil {
return true
} else if iResp.GetSeries() != nil && jResp.GetSeries() == nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/store/proxy_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,37 @@ func TestProxyResponseHeapSort(t *testing.T) {
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
{
title: "test",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
},
} {
t.Run(tcase.title, func(t *testing.T) {
h := NewProxyResponseHeap(tcase.input...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type CloseDelegator interface {
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels)
srv := newFlushableServer(seriesSrv)

match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
if err != nil {
Expand Down

0 comments on commit c37f0ed

Please sign in to comment.