Skip to content

Commit

Permalink
Fixed existing token not loaded issue and added unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle committed Dec 4, 2024
1 parent 913e056 commit 1b19602
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 3 deletions.
9 changes: 6 additions & 3 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,14 +542,14 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if i.cfg.ObservePeriod > 0 {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
if err := i.autoJoin(context.Background(), JOINING, addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}

level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
if err := i.autoJoin(context.Background(), i.getPreviousState(), addedInRing); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
}
}
Expand Down Expand Up @@ -892,7 +892,7 @@ func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
}

// autoJoin selects random tokens & moves state to targetState
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) error {
func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState, alreadyInRing bool) error {
var ringDesc *Desc

err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
Expand All @@ -907,6 +907,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState InstanceState) er
// At this point, we should not have any tokens, and we should be in PENDING state.
// Need to make sure we didn't change the num of tokens configured
myTokens, _ := ringDesc.TokensFor(i.ID)
if !alreadyInRing {
myTokens = i.getTokens()
}
needTokens := i.cfg.NumTokens - len(myTokens)

if needTokens == 0 && myTokens.Equals(i.getTokens()) {
Expand Down
102 changes: 102 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,108 @@ func TestTokenFileOnDisk(t *testing.T) {
}
}

func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

tokenDir := t.TempDir()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// First ingester joins the ring
l1.Join()

// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
if ok {
expTokens = desc.Ingesters["ing1"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512
})

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, false, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l2))
defer services.StopAndAwaitTerminated(context.Background(), l2) //nolint:errcheck

// Check this ingester should not in the ring before calling Join
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
_, ingesterInRing := desc.Ingesters["ing2"]
return !ingesterInRing
}
return ok
})

// New ingester joins the ring
l2.Join()

// Check this ingester joined, is in readonly state, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == READONLY &&
len(desc.Ingesters["ing2"].Tokens) == 512
})

// Check for same tokens.
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
for i := 0; i < 512; i++ {
require.Equal(t, expTokens, actTokens)
}
}

// JoinInLeavingState ensures that if the lifecycler starts up and the ring already has it in a LEAVING state that it still is able to auto join
func TestJoinInLeavingState(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
Expand Down

0 comments on commit 1b19602

Please sign in to comment.