diff --git a/CHANGELOG.md b/CHANGELOG.md index d84085fadf..a9c4d8c67c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ * [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372 * [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386 * [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358 +* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay #6277 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 * [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326 * [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271 diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 7da85dc856..09a3929d64 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1043,6 +1043,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2) assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="auto joined with new tokens" ring=compactor state=ACTIVE`, `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=info component=compactor msg="compactor is ACTIVE in the ring"`, `level=info component=compactor msg="compactor started"`, @@ -1836,6 +1837,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { assert.Equal(t, context.DeadlineExceeded, err) assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="auto joined with new tokens" ring=compactor state=JOINING`, `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, diff --git a/pkg/ring/kv/dynamodb/client.go b/pkg/ring/kv/dynamodb/client.go index 75fef517b3..71de47f0e5 100644 --- a/pkg/ring/kv/dynamodb/client.go +++ b/pkg/ring/kv/dynamodb/client.go @@ -199,6 +199,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou if len(putRequests) == 0 && len(deleteRequests) == 0 { // no change detected, retry + level.Warn(c.logger).Log("msg", "no change detected in ring, retry CAS") bo.Wait() continue } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index d8d1543f41..7e8b033e83 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -491,7 +491,8 @@ func (i *Lifecycler) loop(ctx context.Context) error { joined := false // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. - if err := i.initRing(context.Background()); err != nil { + addedInRing, err := i.initRing(context.Background()) + if err != nil { return errors.Wrapf(err, "failed to join the ring %s", i.RingName) } @@ -504,18 +505,23 @@ func (i *Lifecycler) loop(ctx context.Context) error { } var heartbeatTickerChan <-chan time.Time - if uint64(i.cfg.HeartbeatPeriod) > 0 { - heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) - heartbeatTicker.Stop() - // We are jittering for at least half of the time and max the time of the heartbeat. - // If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens - time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() { - i.heartbeat(ctx) - heartbeatTicker.Reset(i.cfg.HeartbeatPeriod) - }) - defer heartbeatTicker.Stop() - - heartbeatTickerChan = heartbeatTicker.C + startHeartbeat := func() { + if uint64(i.cfg.HeartbeatPeriod) > 0 { + heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod) + heartbeatTicker.Stop() + // We are jittering for at least half of the time and max the time of the heartbeat. + // If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens + time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() { + i.heartbeat(ctx) + heartbeatTicker.Reset(i.cfg.HeartbeatPeriod) + }) + defer heartbeatTicker.Stop() + + heartbeatTickerChan = heartbeatTicker.C + } + } + if addedInRing { + startHeartbeat() } for { @@ -536,17 +542,21 @@ func (i *Lifecycler) loop(ctx context.Context) error { if i.cfg.ObservePeriod > 0 { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. - if err := i.autoJoin(context.Background(), JOINING); err != nil { + if err := i.autoJoin(context.Background(), JOINING, addedInRing); err != nil { return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { - if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil { + if err := i.autoJoin(context.Background(), i.getPreviousState(), addedInRing); err != nil { return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState()) } } + + if !addedInRing { + startHeartbeat() + } } case <-observeChan: @@ -565,6 +575,10 @@ func (i *Lifecycler) loop(ctx context.Context) error { if err != nil { level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err) } + + if !addedInRing { + startHeartbeat() + } } else { level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName) // keep observing @@ -653,12 +667,13 @@ heartbeatLoop: // initRing is the first thing we do when we start. It: // - add an ingester entry to the ring // - copies out our state and tokens if they exist -func (i *Lifecycler) initRing(ctx context.Context) error { +func (i *Lifecycler) initRing(ctx context.Context) (bool, error) { var ( ringDesc *Desc tokensFromFile Tokens err error ) + addedInRing := true if i.cfg.TokensFilePath != "" { tokenFile, err := i.loadTokenFile() @@ -692,10 +707,15 @@ func (i *Lifecycler) initRing(ctx context.Context) error { level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile)) if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup { i.setState(i.getPreviousState()) + state := i.GetState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, state, registeredAt) + level.Info(i.logger).Log("msg", "auto join on startup, adding with token and state", "ring", i.RingName, "state", state) + return ringDesc, true, nil } - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt) i.setTokens(tokensFromFile) - return ringDesc, true, nil + // Do not return ring to CAS call since instance has not been added to ring yet. + addedInRing = false + return nil, true, nil } // Either we are a new ingester, or consul must have restarted @@ -760,7 +780,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { i.updateCounters(ringDesc) } - return err + return addedInRing, err } func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) { @@ -875,7 +895,7 @@ func (i *Lifecycler) compareTokens(fromRing Tokens) bool { } // autoJoin selects random tokens & moves state to targetState -func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error { +func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState, alreadyInRing bool) error { var ringDesc *Desc err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) { @@ -890,11 +910,16 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er // At this point, we should not have any tokens, and we should be in PENDING state. // Need to make sure we didn't change the num of tokens configured myTokens, _ := ringDesc.TokensFor(i.ID) + if !alreadyInRing { + myTokens = i.getTokens() + } needTokens := i.cfg.NumTokens - len(myTokens) if needTokens == 0 && myTokens.Equals(i.getTokens()) { // Tokens have been verified. No need to change them. - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) + state := i.GetState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt()) + level.Info(i.logger).Log("msg", "auto joined with existing tokens", "ring", i.RingName, "state", state) return ringDesc, true, nil } @@ -908,7 +933,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er sort.Sort(myTokens) i.setTokens(myTokens) - ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) + state := i.GetState() + ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt()) + level.Info(i.logger).Log("msg", "auto joined with new tokens", "ring", i.RingName, "state", state) return ringDesc, true, nil }) diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 2822695b2e..035cfc8f1b 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -827,6 +827,108 @@ func TestTokenFileOnDisk(t *testing.T) { } } +func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + tokenDir := t.TempDir() + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.NumTokens = 512 + lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" + + // Start first ingester. + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) + + // First ingester joins the ring + l1.Join() + + // Check this ingester joined, is active, and has 512 token. + var expTokens []uint32 + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + if ok { + expTokens = desc.Ingesters["ing1"].Tokens + } + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing1"].State == ACTIVE && + len(desc.Ingesters["ing1"].Tokens) == 512 + }) + + // Change state from ACTIVE to READONLY + err = l1.ChangeState(context.Background(), READONLY) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == READONLY + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1)) + + // Start new ingester at same token directory. + lifecyclerConfig.ID = "ing2" + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2)) + defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck + + // Check this ingester should not in the ring before calling Join + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + if ok { + _, ingesterInRing := desc.Ingesters["ing2"] + return !ingesterInRing + } + return ok + }) + + // New ingester joins the ring + l2.Join() + + // Check this ingester joined, is in readonly state, and has 512 token. + var actTokens []uint32 + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + if ok { + actTokens = desc.Ingesters["ing2"].Tokens + } + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing2"].State == READONLY && + len(desc.Ingesters["ing2"].Tokens) == 512 + }) + + // Check for same tokens. + sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] }) + sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] }) + for i := 0; i < 512; i++ { + require.Equal(t, expTokens, actTokens) + } +} + // JoinInLeavingState ensures that if the lifecycler starts up and the ring already has it in a LEAVING state that it still is able to auto join func TestJoinInLeavingState(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)