Skip to content

Commit

Permalink
Store: always sort, just compare labelset in proxy heap
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
mhoffm-aiven authored and MichaHoffmann committed Sep 6, 2023
1 parent d1edf74 commit 882e417
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), req.WithoutReplicaLabels)
srv := newFlushableServer(seriesSrv)
if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
err = s.queryGate.Start(srv.Context())
Expand Down
16 changes: 1 addition & 15 deletions pkg/store/flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/stringset"
)

// flushableServer is an extension of storepb.Store_SeriesServer with a Flush method.
Expand All @@ -20,23 +19,10 @@ type flushableServer interface {

func newFlushableServer(
upstream storepb.Store_SeriesServer,
labelNames stringset.Set,
replicaLabels []string,
) flushableServer {
if labelNames.HasAny(replicaLabels) {
return &resortingServer{Store_SeriesServer: upstream}
}
return &passthroughServer{Store_SeriesServer: upstream}
return &resortingServer{Store_SeriesServer: upstream}
}

// passthroughServer is a flushableServer that forwards all data to
// an upstream server without additional processing.
type passthroughServer struct {
storepb.Store_SeriesServer
}

func (p *passthroughServer) Flush() error { return nil }

// resortingServer is a flushableServer that resorts all series by their labels.
// This is required if replica labels are stored internally in a TSDB.
// Data is resorted and sent to an upstream server upon calling Flush.
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
s := newFlushableServer(seriesSrv, p.labelNamesSet(), r.WithoutReplicaLabels)
s := newFlushableServer(seriesSrv)
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
Expand Down
50 changes: 2 additions & 48 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,36 +164,18 @@ func (d *dedupResponseHeap) At() *storepb.SeriesResponse {
// tournament trees need n-1 auxiliary nodes so there
// might not be much of a difference.
type ProxyResponseHeap struct {
nodes []ProxyResponseHeapNode
iLblsScratch labels.Labels
jLblsScratch labels.Labels
nodes []ProxyResponseHeapNode
}

func (h *ProxyResponseHeap) Less(i, j int) bool {
iResp := h.nodes[i].rs.At()
jResp := h.nodes[j].rs.At()

if iResp.GetSeries() != nil && jResp.GetSeries() != nil {
// Response sets are sorted before adding external labels.
// This comparison excludes those labels to keep the same order.
iStoreLbls := h.nodes[i].rs.StoreLabels()
jStoreLbls := h.nodes[j].rs.StoreLabels()

iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels)
jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels)

copyLabels(&h.iLblsScratch, iLbls)
copyLabels(&h.jLblsScratch, jLbls)

var iExtLbls, jExtLbls labels.Labels
h.iLblsScratch, iExtLbls = dropLabels(h.iLblsScratch, iStoreLbls)
h.jLblsScratch, jExtLbls = dropLabels(h.jLblsScratch, jStoreLbls)

c := labels.Compare(h.iLblsScratch, h.jLblsScratch)
if c != 0 {
return c < 0
}
return labels.Compare(iExtLbls, jExtLbls) < 0
return labels.Compare(iLbls, jLbls) < 0
} else if iResp.GetSeries() == nil && jResp.GetSeries() != nil {
return true
} else if iResp.GetSeries() != nil && jResp.GetSeries() == nil {
Expand Down Expand Up @@ -794,34 +776,6 @@ func rmLabels(l labels.Labels, labelsToRemove map[string]struct{}) labels.Labels
return l
}

// dropLabels removes labels from the given label set and returns the removed labels.
func dropLabels(l labels.Labels, labelsToDrop map[string]struct{}) (labels.Labels, labels.Labels) {
cutoff := len(l)
for i := 0; i < len(l); i++ {
if i == cutoff {
break
}
if _, ok := labelsToDrop[l[i].Name]; !ok {
continue
}

lbl := l[i]
l = append(append(l[:i], l[i+1:]...), lbl)
cutoff--
i--
}

return l[:cutoff], l[cutoff:]
}

func copyLabels(dest *labels.Labels, src labels.Labels) {
if len(*dest) < cap(src) {
*dest = make([]labels.Label, len(src))
}
*dest = (*dest)[:len(src)]
copy(*dest, src)
}

// sortWithoutLabels removes given labels from series and re-sorts the series responses that the same
// series with different labels are coming right after each other. Other types of responses are moved to front.
func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string]struct{}) {
Expand Down
58 changes: 31 additions & 27 deletions pkg/store/proxy_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,33 +82,6 @@ func TestProxyResponseHeapSort(t *testing.T) {
storeSeriesResponse(t, labelsFromStrings("g", "7", "h", "8", "i", "9")),
},
},
{
title: "merge duplicated sets that were ordered before adding external labels",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
storeLabels: map[string]struct{}{"c": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
},
},
{
title: "merge repeated series in stores with different external labels",
input: []respSet{
Expand Down Expand Up @@ -190,6 +163,37 @@ func TestProxyResponseHeapSort(t *testing.T) {
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
},
},
{
title: "test",
input: []respSet{
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
storeLabels: map[string]struct{}{"receive": {}, "tenant_id": {}, "thanos_replica": {}},
},
&eagerRespSet{
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
storeLabels: map[string]struct{}{"cluster": {}, "prometheus": {}, "prometheus_replica": {}, "receive": {}, "tenant_id": {}, "thanos_replica": {}, "thanos_ruler_replica": {}},
},
},
exp: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.5.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.6.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
},
},
} {
t.Run(tcase.title, func(t *testing.T) {
h := NewProxyResponseHeap(tcase.input...)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ type CloseDelegator interface {
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
srv := newFlushableServer(seriesSrv, s.LabelNamesSet(), r.WithoutReplicaLabels)
srv := newFlushableServer(seriesSrv)

match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
if err != nil {
Expand Down

0 comments on commit 882e417

Please sign in to comment.