Skip to content

Commit

Permalink
feat: background worker routines to shutdown client for migration (ze…
Browse files Browse the repository at this point in the history
…ta-chain#2538)

* add background threads

* add cancel for child context

* add comments

* remove commented code

* add default constant backoff

* generate files

* resolve comments 1

* resolve comments 1

* resolve comments 2

* rename to callback to clarify terminiology

* remove cancel cause

* generate files

* move changelog to unreleased

* Add bg.OnComplete

* Add maintenance package. Move TSS listener to maintenance

* Fix merge conflicts

* fix tss migration test

* add structured logging

* remove check fro nonce 0

* fix OutboundID generation to make the identifier more unique

---------

Co-authored-by: Dmitry <[email protected]>
Co-authored-by: Charlie Chen <[email protected]>
  • Loading branch information
3 people authored Aug 6, 2024
1 parent b56e758 commit d94047a
Show file tree
Hide file tree
Showing 18 changed files with 397 additions and 166 deletions.
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [2578](https://github.com/zeta-chain/node/pull/2578) - Add Gateway address in protocol contract list
* [2634](https://github.com/zeta-chain/node/pull/2634) - add support for EIP-1559 gas fees
* [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient
* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker routines to shutdown zetaclientd when needed for tss migration


## v19.0.0
Expand Down
133 changes: 5 additions & 128 deletions cmd/zetaclientd-supervisor/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"runtime"
"strings"
"sync"
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
upgradetypes "github.com/cosmos/cosmos-sdk/x/upgrade/types"
"github.com/hashicorp/go-getter"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/config"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func newZetaclientdSupervisor(
logger = logger.With().Str("module", "zetaclientdSupervisor").Logger()
conn, err := grpc.Dial(
fmt.Sprintf("%s:9090", zetaCoreURL),
grpc.WithInsecure(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
Expand All @@ -99,9 +98,6 @@ func newZetaclientdSupervisor(
func (s *zetaclientdSupervisor) Start(ctx context.Context) {
go s.watchForVersionChanges(ctx)
go s.handleCoreUpgradePlan(ctx)
go s.handleNewKeygen(ctx)
go s.handleNewTSSKeyGeneration(ctx)
go s.handleTSSUpdate(ctx)
}

func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) {
Expand Down Expand Up @@ -177,125 +173,6 @@ func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) {
}
}

func (s *zetaclientdSupervisor) handleTSSUpdate(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
// https://github.com/zeta-chain/node/issues/2492
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
tss, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get original tss")
time.Sleep(retryInterval)
continue
}
i = 0
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssNew, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss")
continue
}

if tssNew.TSS.TssPubkey == tss.TSS.TssPubkey {
continue
}

tss = tssNew
s.logger.Info().
Msgf("tss address is updated from %s to %s", tss.TSS.TssPubkey, tssNew.TSS.TssPubkey)
time.Sleep(6 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss address")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleTSSUpdate exiting without success")
}

func (s *zetaclientdSupervisor) handleNewTSSKeyGeneration(ctx context.Context) {
maxRetries := 11
retryInterval := 5 * time.Second

// TODO : use retry library under pkg/retry
for i := 0; i < maxRetries; i++ {
client := observertypes.NewQueryClient(s.zetacoredConn)
alltss, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss original history")
time.Sleep(retryInterval)
continue
}
i = 0
tssLenCurrent := len(alltss.TssList)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
tssListNew, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get tss new history")
continue
}
tssLenUpdated := len(tssListNew.TssList)

if tssLenUpdated == tssLenCurrent {
continue
}
if tssLenUpdated < tssLenCurrent {
tssLenCurrent = len(tssListNew.TssList)
continue
}

tssLenCurrent = tssLenUpdated
s.logger.Info().Msgf("tss list updated from %d to %d", tssLenCurrent, tssLenUpdated)
time.Sleep(5 * time.Second)
s.logger.Info().Msg("restarting zetaclientd to update tss list")
s.restartChan <- syscall.SIGHUP
}
}
s.logger.Warn().Msg("handleNewTSSKeyGeneration exiting without success")
}

func (s *zetaclientdSupervisor) handleNewKeygen(ctx context.Context) {
client := observertypes.NewQueryClient(s.zetacoredConn)
prevKeygenBlock := int64(0)
for {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
resp, err := client.Keygen(ctx, &observertypes.QueryGetKeygenRequest{})
if err != nil {
s.logger.Warn().Err(err).Msg("unable to get keygen")
continue
}
if resp.Keygen == nil {
s.logger.Warn().Err(err).Msg("keygen is nil")
continue
}

if resp.Keygen.Status != observertypes.KeygenStatus_PendingKeygen {
continue
}
keygenBlock := resp.Keygen.BlockNumber
if prevKeygenBlock == keygenBlock {
continue
}
prevKeygenBlock = keygenBlock
s.logger.Info().Msgf("got new keygen at block %d", keygenBlock)
s.restartChan <- syscall.SIGHUP
}
}
func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) {
client := upgradetypes.NewQueryClient(s.zetacoredConn)

Expand Down Expand Up @@ -345,16 +222,16 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u
if plan.Info == "" {
return errors.New("upgrade info empty")
}
var config upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &config)
var cfg upgradeConfig
err := json.Unmarshal([]byte(plan.Info), &cfg)
if err != nil {
return fmt.Errorf("unmarshal upgrade config: %w", err)
}

s.logger.Info().Msg("downloading zetaclientd")

binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH)
binURL, ok := config.Binaries[binKey]
binURL, ok := cfg.Binaries[binKey]
if !ok {
return fmt.Errorf("no binary found for: %s", binKey)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/zetaclientd-supervisor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"syscall"
"time"

"cosmossdk.io/errors"
"golang.org/x/sync/errgroup"

"github.com/zeta-chain/zetacore/app"
Expand Down Expand Up @@ -69,10 +70,17 @@ func main() {
cmd.Stdin = &passwordInputBuffer

eg, ctx := errgroup.WithContext(ctx)
eg.Go(cmd.Run)
eg.Go(func() error {
defer cancel()
if err := cmd.Run(); err != nil {
return errors.Wrap(err, "zetaclient process failed")
}

logger.Info().Msg("zetaclient process exited")
return nil
})
eg.Go(func() error {
supervisor.WaitForReloadSignal(ctx)
cancel()
return nil
})
eg.Go(func() error {
Expand Down
22 changes: 16 additions & 6 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
"github.com/zeta-chain/zetacore/zetaclient/config"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/maintenance"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/orchestrator"
mc "github.com/zeta-chain/zetacore/zetaclient/tss"
Expand Down Expand Up @@ -206,6 +207,16 @@ func start(_ *cobra.Command, _ []string) error {
// Set P2P ID for telemetry
telemetryServer.SetP2PID(server.GetLocalPeerID())

// Creating a channel to listen for os signals (or other signals)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM)

// Maintenance workers ============
maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() {
masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.")
signalChannel <- syscall.SIGTERM
})

// Generate a new TSS if keygen is set and add it into the tss server
// If TSS has already been generated, and keygen was successful ; we use the existing TSS
err = GenerateTSS(ctx, masterLogger, zetacoreClient, server)
Expand Down Expand Up @@ -257,7 +268,7 @@ func start(_ *cobra.Command, _ []string) error {
// Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core
// Returns err if the RPC call fails as zeta client needs the current TSS address to be set
// This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast
currentTss, err := zetacoreClient.GetCurrentTSS(ctx)
currentTss, err := zetacoreClient.GetTSS(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetCurrentTSS error")
return err
Expand Down Expand Up @@ -350,11 +361,10 @@ func start(_ *cobra.Command, _ []string) error {
// defer zetaSupplyChecker.Stop()
//}

startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...")
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
sig := <-ch
startLogger.Info().Msgf("stop signal received: %s", sig)
startLogger.Info().Msgf("Zetaclientd is running")

sig := <-signalChannel
startLogger.Info().Msgf("Stop signal received: %q", sig)

zetacoreClient.Stop()

Expand Down
2 changes: 2 additions & 0 deletions contrib/localnet/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand All @@ -142,6 +143,7 @@ services:
- ETHDEV_ENDPOINT=http://eth:8545
- HOTKEY_BACKEND=file
- HOTKEY_PASSWORD=password # test purposes only
restart: always
volumes:
- ssh:/root/.ssh
- preparams:/root/preparams
Expand Down
38 changes: 28 additions & 10 deletions pkg/bg/bg.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

type config struct {
name string
logger zerolog.Logger
name string
logger zerolog.Logger
onComplete func()
}

type Opt func(*config)
Expand All @@ -20,15 +21,22 @@ func WithName(name string) Opt {
return func(cfg *config) { cfg.name = name }
}

// OnComplete is a callback function that is called
// when the background task is completed without an error
func OnComplete(fn func()) Opt {
return func(cfg *config) { cfg.onComplete = fn }
}

func WithLogger(logger zerolog.Logger) Opt {
return func(cfg *config) { cfg.logger = logger }
}

// Work emits a new task in the background
func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {
cfg := config{
name: "",
logger: zerolog.Nop(),
name: "",
logger: zerolog.Nop(),
onComplete: nil,
}

for _, opt := range opts {
Expand All @@ -45,10 +53,25 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {

if err := f(ctx); err != nil {
logError(err, cfg, false)
return
}

if cfg.onComplete != nil {
cfg.onComplete()
}

cfg.logger.Info().Str("worker.name", cfg.getName()).Msg("Background task completed")
}()
}

func (c config) getName() string {
if c.name != "" {
return c.name
}

return "unknown"
}

func logError(err error, cfg config, isPanic bool) {
if err == nil {
return
Expand All @@ -71,10 +94,5 @@ func logError(err error, cfg config, isPanic bool) {
evt.Bytes("stack_trace", buf)
}

name := cfg.name
if name == "" {
name = "unknown"
}

evt.Str("worker.name", name).Msg("Background task failed")
evt.Str("worker.name", cfg.getName()).Msg("Background task failed")
}
Loading

0 comments on commit d94047a

Please sign in to comment.