Skip to content

Commit

Permalink
[ES-1314123] race condition for memorize tsdb client
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Nov 25, 2024
1 parent b0aff72 commit 7abfdf9
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 56 deletions.
2 changes: 1 addition & 1 deletion docs/sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt

# Relabelling

Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.

Currently, thanos only supports the following relabel actions:

Expand Down
70 changes: 15 additions & 55 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
tsdbClientsNeedUpdate bool

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool

metricNameFilterEnabled bool
}

Expand Down Expand Up @@ -100,19 +94,17 @@ func NewMultiTSDB(
}

mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
Expand Down Expand Up @@ -435,8 +427,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
}

return merr.Err()
Expand Down Expand Up @@ -598,17 +588,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {

func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
if !t.tsdbClientsNeedUpdate {
t.mtx.RUnlock()
return t.tsdbClients
}

t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()
if !t.tsdbClientsNeedUpdate {
return t.tsdbClients
}
defer t.mtx.RUnlock()

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
Expand All @@ -618,25 +598,12 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {
}
}

t.tsdbClientsNeedUpdate = false
t.tsdbClients = res

return t.tsdbClients
return res
}

func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
if !t.exemplarClientsNeedUpdate {
t.mtx.RUnlock()
return t.exemplarClients
}
t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()

if !t.exemplarClientsNeedUpdate {
return t.exemplarClients
}
defer t.mtx.RUnlock()

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
Expand All @@ -645,10 +612,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
res[k] = e
}
}

t.exemplarClientsNeedUpdate = false
t.exemplarClients = res
return t.exemplarClients
return res
}

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -725,8 +689,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()
return err
}
Expand Down Expand Up @@ -779,8 +741,6 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan

tenant = newTenant()
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
36 changes: 36 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -541,6 +542,41 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
}

func TestMultiTSDBAddNewTenant(t *testing.T) {
t.Parallel()

dir := t.TempDir()

m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

concurrency := 20
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10))))
}(i)
go func() {
m.TSDBLocalClients()
}()
}
wg.Wait()
testutil.Equals(t, concurrency, len(m.TSDBLocalClients()))
}

func TestAlignedHeadFlush(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 7abfdf9

Please sign in to comment.