Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure unregistered ingester joining the ring after WAL replay #6277

Merged
16 changes: 8 additions & 8 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
}, i.getOldestUnshippedBlockMetric)
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, false, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, true, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -799,6 +799,13 @@ func (i *Ingester) startingV2ForFlusher(ctx context.Context) error {
}

func (i *Ingester) starting(ctx context.Context) error {
if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()

return errors.Wrap(err, "opening existing TSDBs")
}

// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
return errors.Wrap(err, "failed to start lifecycler")
Expand All @@ -807,13 +814,6 @@ func (i *Ingester) starting(ctx context.Context) error {
return errors.Wrap(err, "failed to start lifecycler")
}

if err := i.openExistingTSDB(ctx); err != nil {
// Try to rollback and close opened TSDBs before halting the ingester.
i.closeAllTSDB()

return errors.Wrap(err, "opening existing TSDBs")
}

i.lifecycler.Join()
alexqyle marked this conversation as resolved.
Show resolved Hide resolved

// let's start the rest of subservices via manager
Expand Down
3 changes: 3 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
}

level.Info(i.logger).Log("msg", "finished init ring", "ring", i.RingName, "state", i.GetState())

// We do various period tasks
var autoJoinAfter <-chan time.Time
var observeChan <-chan time.Time
Expand Down Expand Up @@ -480,6 +482,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
select {
case <-i.autojoinChan:
autoJoinAfter = time.After(i.cfg.JoinAfter)
level.Info(i.logger).Log("msg", "will do auto-joining after timeout", "timeout", i.cfg.JoinAfter, "state", i.GetState())
case <-autoJoinAfter:
if joined {
continue
Expand Down
Loading