Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure unregistered ingester joining the ring after WAL replay #6277

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay #6277
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
Expand Down
2 changes: 2 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=ACTIVE`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=info component=compactor msg="compactor is ACTIVE in the ring"`,
`level=info component=compactor msg="compactor started"`,
Expand Down Expand Up @@ -1836,6 +1837,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=JOINING`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
Expand Down
1 change: 1 addition & 0 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou

if len(putRequests) == 0 && len(deleteRequests) == 0 {
// no change detected, retry
level.Warn(c.logger).Log("msg", "no change detected in ring, retry CAS")
bo.Wait()
continue
}
Expand Down
71 changes: 49 additions & 22 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
joined := false
// First, see if we exist in the cluster, update our state to match if we do,
// and add ourselves (without tokens) if we don't.
if err := i.initRing(context.Background()); err != nil {
addedInRing, err := i.initRing(context.Background())
if err != nil {
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
}

Expand All @@ -504,18 +505,23 @@ func (i *Lifecycler) loop(ctx context.Context) error {
}

var heartbeatTickerChan <-chan time.Time
if uint64(i.cfg.HeartbeatPeriod) > 0 {
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
heartbeatTicker.Stop()
// We are jittering for at least half of the time and max the time of the heartbeat.
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
i.heartbeat(ctx)
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
})
defer heartbeatTicker.Stop()

heartbeatTickerChan = heartbeatTicker.C
startHeartbeat := func() {
if uint64(i.cfg.HeartbeatPeriod) > 0 {
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
heartbeatTicker.Stop()
// We are jittering for at least half of the time and max the time of the heartbeat.
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
i.heartbeat(ctx)
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
})
defer heartbeatTicker.Stop()

heartbeatTickerChan = heartbeatTicker.C
}
}
if addedInRing {
startHeartbeat()
}

for {
Expand All @@ -536,17 +542,21 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if i.cfg.ObservePeriod > 0 {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
if err := i.autoJoin(context.Background(), JOINING, addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}

level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
if err := i.autoJoin(context.Background(), i.getPreviousState(), addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
}
}

if !addedInRing {
startHeartbeat()
}
}

case <-observeChan:
Expand All @@ -565,6 +575,10 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err)
}

if !addedInRing {
startHeartbeat()
}
} else {
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
// keep observing
Expand Down Expand Up @@ -653,12 +667,13 @@ heartbeatLoop:
// initRing is the first thing we do when we start. It:
// - add an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
func (i *Lifecycler) initRing(ctx context.Context) (bool, error) {
var (
ringDesc *Desc
tokensFromFile Tokens
err error
)
addedInRing := true

if i.cfg.TokensFilePath != "" {
tokenFile, err := i.loadTokenFile()
Expand Down Expand Up @@ -692,10 +707,15 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
i.setState(i.getPreviousState())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, state, registeredAt)
level.Info(i.logger).Log("msg", "auto join on startup, adding with token and state", "ring", i.RingName, "state", state)
return ringDesc, true, nil
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
// Do not return ring to CAS call since instance has not been added to ring yet.
addedInRing = false
return nil, true, nil
}

// Either we are a new ingester, or consul must have restarted
Expand Down Expand Up @@ -760,7 +780,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
i.updateCounters(ringDesc)
}

return err
return addedInRing, err
}

func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) {
Expand Down Expand Up @@ -875,7 +895,7 @@ func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
}

// autoJoin selects random tokens & moves state to targetState
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error {
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState, alreadyInRing bool) error {
var ringDesc *Desc

err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -890,11 +910,16 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
// At this point, we should not have any tokens, and we should be in PENDING state.
// Need to make sure we didn't change the num of tokens configured
myTokens, _ := ringDesc.TokensFor(i.ID)
if !alreadyInRing {
myTokens = i.getTokens()
}
needTokens := i.cfg.NumTokens - len(myTokens)

if needTokens == 0 && myTokens.Equals(i.getTokens()) {
// Tokens have been verified. No need to change them.
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
level.Info(i.logger).Log("msg", "auto joined with existing tokens", "ring", i.RingName, "state", state)
return ringDesc, true, nil
}

Expand All @@ -908,7 +933,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
level.Info(i.logger).Log("msg", "auto joined with new tokens", "ring", i.RingName, "state", state)

return ringDesc, true, nil
})
Expand Down
102 changes: 102 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,108 @@ func TestTokenFileOnDisk(t *testing.T) {
}
}

func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

tokenDir := t.TempDir()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// First ingester joins the ring
l1.Join()

// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
if ok {
expTokens = desc.Ingesters["ing1"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512
})

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck

// Check this ingester should not in the ring before calling Join
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
_, ingesterInRing := desc.Ingesters["ing2"]
return !ingesterInRing
}
return ok
})

// New ingester joins the ring
l2.Join()

// Check this ingester joined, is in readonly state, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == READONLY &&
len(desc.Ingesters["ing2"].Tokens) == 512
})

// Check for same tokens.
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
for i := 0; i < 512; i++ {
require.Equal(t, expTokens, actTokens)
}
}

// JoinInLeavingState ensures that if the lifecycler starts up and the ring already has it in a LEAVING state that it still is able to auto join
func TestJoinInLeavingState(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
Expand Down
Loading