From d1c032f5b7288b22faf7c75186eff37b8d9b74e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 1 Feb 2024 15:10:45 +0200 Subject: [PATCH] receive/handler: fix locking twice MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix bug introduced in https://github.com/thanos-io/thanos/pull/6898: we were RLock()ing twice. This leads to a deadlock in some situations. Signed-off-by: Giedrius Statkevičius --- pkg/receive/handler.go | 8 ---- pkg/receive/handler_test.go | 76 +++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 8 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 6479d66027..7ddfc6fbf8 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -641,14 +641,6 @@ func (h *Handler) forward(ctx context.Context, tenantHTTP string, r replica, wre span, ctx := tracing.StartSpan(ctx, "receive_fanout_forward") defer span.Finish() - // It is possible that hashring is ready in testReady() but unready now, - // so need to lock here. - h.mtx.RLock() - if h.hashring == nil { - h.mtx.RUnlock() - return errors.New("hashring is not ready") - } - var replicas []uint64 if r.replicated { replicas = []uint64{r.n} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 17adf902c9..3d7bd7fa73 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -1709,3 +1709,79 @@ func TestDistributeSeries(t *testing.T) { require.Equal(t, 1, labelpb.ZLabelsToPromLabels(remote[endpointReplica{endpoint: "http://localhost:9090", replica: 0}].timeSeries[1].Labels).Len()) require.Equal(t, map[string]struct{}{"bar": {}, "boo": {}}, hr.seenTenants) } + +func TestHandlerFlippingHashrings(t *testing.T) { + h := NewHandler(log.NewLogfmtLogger(os.Stderr), &Options{}) + t.Cleanup(h.Close) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + h1, err := newSimpleHashring([]Endpoint{ + { + Address: "http://localhost:9090", + }, + }) + require.NoError(t, err) + h2, err := newSimpleHashring([]Endpoint{ + { + Address: "http://localhost:9091", + }, + }) + require.NoError(t, err) + + h.Hashring(h1) + + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + + for { + select { + case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): + return + } + + err := h.handleRequest(ctx, 0, "test", &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")), + Samples: []prompb.Sample{ + { + Timestamp: time.Now().Unix(), + Value: 123, + }, + }, + }, + }, + }) + require.Error(t, err) + } + }() + go func() { + defer wg.Done() + var flipper bool + + for { + select { + case <-time.After(200 * time.Millisecond): + case <-ctx.Done(): + return + } + + if flipper { + h.Hashring(h2) + } else { + h.Hashring(h1) + } + flipper = !flipper + } + }() + + <-time.After(1 * time.Second) + cancel() + wg.Wait() +}