Skip to content

Commit

Permalink
Make sure unregistered ingester joining the ring after WAL replay (#6277
Browse files Browse the repository at this point in the history
)

* Make sure ingester is active when joining the ring

Signed-off-by: Alex Le <[email protected]>

* tokens from file would be ignored if instance was not in the ring when starting

Signed-off-by: Alex Le <[email protected]>

* Skip CAS if instance was not in the ring on start and delay heartbeat start time

Signed-off-by: Alex Le <[email protected]>

* Fix compactor tests

Signed-off-by: Alex Le <[email protected]>

* Fixed existing token not loaded issue and added unit test

Signed-off-by: Alex Le <[email protected]>

* update changelog

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Dec 5, 2024
1 parent 7191ecb commit bdc357c
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* [ENHANCEMENT] Distributor: Expose `cortex_label_size_bytes` native histogram metric. #6372
* [ENHANCEMENT] Add new option `-server.grpc_server-num-stream-workers` to configure the number of worker goroutines that should be used to process incoming streams. #6386
* [ENHANCEMENT] Distributor: Return HTTP 5XX instead of HTTP 4XX when instance limits are hit. #6358
* [ENHANCEMENT] Ingester: Make sure unregistered ingester joining the ring after WAL replay #6277
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
Expand Down
2 changes: 2 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
tsdbPlanner.AssertNumberOfCalls(t, "Plan", 2)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=ACTIVE`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=info component=compactor msg="compactor is ACTIVE in the ring"`,
`level=info component=compactor msg="compactor started"`,
Expand Down Expand Up @@ -1836,6 +1837,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
assert.Equal(t, context.DeadlineExceeded, err)

assert.ElementsMatch(t, []string{
`level=info component=compactor msg="auto joined with new tokens" ring=compactor state=JOINING`,
`level=info component=compactor msg="compactor started"`,
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
Expand Down
1 change: 1 addition & 0 deletions pkg/ring/kv/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (ou

if len(putRequests) == 0 && len(deleteRequests) == 0 {
// no change detected, retry
level.Warn(c.logger).Log("msg", "no change detected in ring, retry CAS")
bo.Wait()
continue
}
Expand Down
71 changes: 49 additions & 22 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
joined := false
// First, see if we exist in the cluster, update our state to match if we do,
// and add ourselves (without tokens) if we don't.
if err := i.initRing(context.Background()); err != nil {
addedInRing, err := i.initRing(context.Background())
if err != nil {
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
}

Expand All @@ -504,18 +505,23 @@ func (i *Lifecycler) loop(ctx context.Context) error {
}

var heartbeatTickerChan <-chan time.Time
if uint64(i.cfg.HeartbeatPeriod) > 0 {
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
heartbeatTicker.Stop()
// We are jittering for at least half of the time and max the time of the heartbeat.
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
i.heartbeat(ctx)
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
})
defer heartbeatTicker.Stop()

heartbeatTickerChan = heartbeatTicker.C
startHeartbeat := func() {
if uint64(i.cfg.HeartbeatPeriod) > 0 {
heartbeatTicker := time.NewTicker(i.cfg.HeartbeatPeriod)
heartbeatTicker.Stop()
// We are jittering for at least half of the time and max the time of the heartbeat.
// If we jitter too soon, we can have problems of concurrency with autoJoin leaving the instance on ACTIVE without tokens
time.AfterFunc(time.Duration(uint64(i.cfg.HeartbeatPeriod/2)+uint64(mathrand.Int63())%uint64(i.cfg.HeartbeatPeriod/2)), func() {
i.heartbeat(ctx)
heartbeatTicker.Reset(i.cfg.HeartbeatPeriod)
})
defer heartbeatTicker.Stop()

heartbeatTickerChan = heartbeatTicker.C
}
}
if addedInRing {
startHeartbeat()
}

for {
Expand All @@ -536,17 +542,21 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if i.cfg.ObservePeriod > 0 {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
if err := i.autoJoin(context.Background(), JOINING, addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}

level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
if err := i.autoJoin(context.Background(), i.getPreviousState(), addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
}
}

if !addedInRing {
startHeartbeat()
}
}

case <-observeChan:
Expand All @@ -565,6 +575,10 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err)
}

if !addedInRing {
startHeartbeat()
}
} else {
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
// keep observing
Expand Down Expand Up @@ -653,12 +667,13 @@ heartbeatLoop:
// initRing is the first thing we do when we start. It:
// - add an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
func (i *Lifecycler) initRing(ctx context.Context) (bool, error) {
var (
ringDesc *Desc
tokensFromFile Tokens
err error
)
addedInRing := true

if i.cfg.TokensFilePath != "" {
tokenFile, err := i.loadTokenFile()
Expand Down Expand Up @@ -692,10 +707,15 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
i.setState(i.getPreviousState())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, state, registeredAt)
level.Info(i.logger).Log("msg", "auto join on startup, adding with token and state", "ring", i.RingName, "state", state)
return ringDesc, true, nil
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
// Do not return ring to CAS call since instance has not been added to ring yet.
addedInRing = false
return nil, true, nil
}

// Either we are a new ingester, or consul must have restarted
Expand Down Expand Up @@ -760,7 +780,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
i.updateCounters(ringDesc)
}

return err
return addedInRing, err
}

func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) {
Expand Down Expand Up @@ -875,7 +895,7 @@ func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
}

// autoJoin selects random tokens & moves state to targetState
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error {
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState, alreadyInRing bool) error {
var ringDesc *Desc

err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -890,11 +910,16 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
// At this point, we should not have any tokens, and we should be in PENDING state.
// Need to make sure we didn't change the num of tokens configured
myTokens, _ := ringDesc.TokensFor(i.ID)
if !alreadyInRing {
myTokens = i.getTokens()
}
needTokens := i.cfg.NumTokens - len(myTokens)

if needTokens == 0 && myTokens.Equals(i.getTokens()) {
// Tokens have been verified. No need to change them.
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
level.Info(i.logger).Log("msg", "auto joined with existing tokens", "ring", i.RingName, "state", state)
return ringDesc, true, nil
}

Expand All @@ -908,7 +933,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
state := i.GetState()
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), state, i.getRegisteredAt())
level.Info(i.logger).Log("msg", "auto joined with new tokens", "ring", i.RingName, "state", state)

return ringDesc, true, nil
})
Expand Down
102 changes: 102 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,108 @@ func TestTokenFileOnDisk(t *testing.T) {
}
}

func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

tokenDir := t.TempDir()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// First ingester joins the ring
l1.Join()

// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
if ok {
expTokens = desc.Ingesters["ing1"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512
})

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck

// Check this ingester should not in the ring before calling Join
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
_, ingesterInRing := desc.Ingesters["ing2"]
return !ingesterInRing
}
return ok
})

// New ingester joins the ring
l2.Join()

// Check this ingester joined, is in readonly state, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == READONLY &&
len(desc.Ingesters["ing2"].Tokens) == 512
})

// Check for same tokens.
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
for i := 0; i < 512; i++ {
require.Equal(t, expTokens, actTokens)
}
}

// JoinInLeavingState ensures that if the lifecycler starts up and the ring already has it in a LEAVING state that it still is able to auto join
func TestJoinInLeavingState(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
Expand Down

0 comments on commit bdc357c

Please sign in to comment.