From 65c3fbebe14db5f09affe28340d9e008f9de41e6 Mon Sep 17 00:00:00 2001 From: Tanmay Date: Tue, 6 Aug 2024 13:47:32 -0400 Subject: [PATCH] feat: background worker routines to shutdown client for migration (#2538) * add background threads * add cancel for child context * add comments * remove commented code * add default constant backoff * generate files * resolve comments 1 * resolve comments 1 * resolve comments 2 * rename to callback to clarify terminiology * remove cancel cause * generate files * move changelog to unreleased * Add bg.OnComplete * Add maintenance package. Move TSS listener to maintenance * Fix merge conflicts * fix tss migration test * add structured logging * remove check fro nonce 0 * fix OutboundID generation to make the identifier more unique --------- Co-authored-by: Dmitry Co-authored-by: Charlie Chen --- changelog.md | 1 + cmd/zetaclientd-supervisor/lib.go | 133 +------------ cmd/zetaclientd-supervisor/main.go | 12 +- cmd/zetaclientd/start.go | 22 ++- contrib/localnet/docker-compose.yml | 2 + pkg/bg/bg.go | 38 +++- pkg/bg/bg_test.go | 30 +++ pkg/retry/retry.go | 5 + zetaclient/chains/base/observer.go | 8 +- zetaclient/chains/base/observer_test.go | 42 +++++ .../chains/bitcoin/observer/outbound.go | 1 + zetaclient/chains/interfaces/interfaces.go | 2 + zetaclient/maintenance/tss_listener.go | 175 ++++++++++++++++++ zetaclient/orchestrator/orchestrator.go | 3 - zetaclient/testutils/mocks/zetacore_client.go | 58 ++++++ zetaclient/zetacore/client.go | 2 +- zetaclient/zetacore/client_query_observer.go | 25 ++- zetaclient/zetacore/client_query_test.go | 4 +- 18 files changed, 397 insertions(+), 166 deletions(-) create mode 100644 zetaclient/maintenance/tss_listener.go diff --git a/changelog.md b/changelog.md index 8f383deb44..8d2af59cdb 100644 --- a/changelog.md +++ b/changelog.md @@ -7,6 +7,7 @@ * [2578](https://github.com/zeta-chain/node/pull/2578) - Add Gateway address in protocol contract list * [2634](https://github.com/zeta-chain/node/pull/2634) - add support for EIP-1559 gas fees * [2597](https://github.com/zeta-chain/node/pull/2597) - Add generic rpc metrics to zetaclient +* [2538](https://github.com/zeta-chain/node/pull/2538) - add background worker routines to shutdown zetaclientd when needed for tss migration ## v19.0.0 diff --git a/cmd/zetaclientd-supervisor/lib.go b/cmd/zetaclientd-supervisor/lib.go index 71f492e88b..e65782c6b1 100644 --- a/cmd/zetaclientd-supervisor/lib.go +++ b/cmd/zetaclientd-supervisor/lib.go @@ -12,7 +12,6 @@ import ( "runtime" "strings" "sync" - "syscall" "time" "github.com/cosmos/cosmos-sdk/client/grpc/tmservice" @@ -20,8 +19,8 @@ import ( "github.com/hashicorp/go-getter" "github.com/rs/zerolog" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" - observertypes "github.com/zeta-chain/zetacore/x/observer/types" "github.com/zeta-chain/zetacore/zetaclient/config" ) @@ -79,7 +78,7 @@ func newZetaclientdSupervisor( logger = logger.With().Str("module", "zetaclientdSupervisor").Logger() conn, err := grpc.Dial( fmt.Sprintf("%s:9090", zetaCoreURL), - grpc.WithInsecure(), + grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return nil, fmt.Errorf("grpc dial: %w", err) @@ -99,9 +98,6 @@ func newZetaclientdSupervisor( func (s *zetaclientdSupervisor) Start(ctx context.Context) { go s.watchForVersionChanges(ctx) go s.handleCoreUpgradePlan(ctx) - go s.handleNewKeygen(ctx) - go s.handleNewTSSKeyGeneration(ctx) - go s.handleTSSUpdate(ctx) } func (s *zetaclientdSupervisor) WaitForReloadSignal(ctx context.Context) { @@ -177,125 +173,6 @@ func (s *zetaclientdSupervisor) watchForVersionChanges(ctx context.Context) { } } -func (s *zetaclientdSupervisor) handleTSSUpdate(ctx context.Context) { - maxRetries := 11 - retryInterval := 5 * time.Second - - // TODO : use retry library under pkg/retry - // https://github.com/zeta-chain/node/issues/2492 - for i := 0; i < maxRetries; i++ { - client := observertypes.NewQueryClient(s.zetacoredConn) - tss, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get original tss") - time.Sleep(retryInterval) - continue - } - i = 0 - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - tssNew, err := client.TSS(ctx, &observertypes.QueryGetTSSRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss") - continue - } - - if tssNew.TSS.TssPubkey == tss.TSS.TssPubkey { - continue - } - - tss = tssNew - s.logger.Info(). - Msgf("tss address is updated from %s to %s", tss.TSS.TssPubkey, tssNew.TSS.TssPubkey) - time.Sleep(6 * time.Second) - s.logger.Info().Msg("restarting zetaclientd to update tss address") - s.restartChan <- syscall.SIGHUP - } - } - s.logger.Warn().Msg("handleTSSUpdate exiting without success") -} - -func (s *zetaclientdSupervisor) handleNewTSSKeyGeneration(ctx context.Context) { - maxRetries := 11 - retryInterval := 5 * time.Second - - // TODO : use retry library under pkg/retry - for i := 0; i < maxRetries; i++ { - client := observertypes.NewQueryClient(s.zetacoredConn) - alltss, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss original history") - time.Sleep(retryInterval) - continue - } - i = 0 - tssLenCurrent := len(alltss.TssList) - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - tssListNew, err := client.TssHistory(ctx, &observertypes.QueryTssHistoryRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get tss new history") - continue - } - tssLenUpdated := len(tssListNew.TssList) - - if tssLenUpdated == tssLenCurrent { - continue - } - if tssLenUpdated < tssLenCurrent { - tssLenCurrent = len(tssListNew.TssList) - continue - } - - tssLenCurrent = tssLenUpdated - s.logger.Info().Msgf("tss list updated from %d to %d", tssLenCurrent, tssLenUpdated) - time.Sleep(5 * time.Second) - s.logger.Info().Msg("restarting zetaclientd to update tss list") - s.restartChan <- syscall.SIGHUP - } - } - s.logger.Warn().Msg("handleNewTSSKeyGeneration exiting without success") -} - -func (s *zetaclientdSupervisor) handleNewKeygen(ctx context.Context) { - client := observertypes.NewQueryClient(s.zetacoredConn) - prevKeygenBlock := int64(0) - for { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return - } - resp, err := client.Keygen(ctx, &observertypes.QueryGetKeygenRequest{}) - if err != nil { - s.logger.Warn().Err(err).Msg("unable to get keygen") - continue - } - if resp.Keygen == nil { - s.logger.Warn().Err(err).Msg("keygen is nil") - continue - } - - if resp.Keygen.Status != observertypes.KeygenStatus_PendingKeygen { - continue - } - keygenBlock := resp.Keygen.BlockNumber - if prevKeygenBlock == keygenBlock { - continue - } - prevKeygenBlock = keygenBlock - s.logger.Info().Msgf("got new keygen at block %d", keygenBlock) - s.restartChan <- syscall.SIGHUP - } -} func (s *zetaclientdSupervisor) handleCoreUpgradePlan(ctx context.Context) { client := upgradetypes.NewQueryClient(s.zetacoredConn) @@ -345,8 +222,8 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u if plan.Info == "" { return errors.New("upgrade info empty") } - var config upgradeConfig - err := json.Unmarshal([]byte(plan.Info), &config) + var cfg upgradeConfig + err := json.Unmarshal([]byte(plan.Info), &cfg) if err != nil { return fmt.Errorf("unmarshal upgrade config: %w", err) } @@ -354,7 +231,7 @@ func (s *zetaclientdSupervisor) downloadZetaclientd(ctx context.Context, plan *u s.logger.Info().Msg("downloading zetaclientd") binKey := fmt.Sprintf("%s-%s/%s", zetaclientdBinaryName, runtime.GOOS, runtime.GOARCH) - binURL, ok := config.Binaries[binKey] + binURL, ok := cfg.Binaries[binKey] if !ok { return fmt.Errorf("no binary found for: %s", binKey) } diff --git a/cmd/zetaclientd-supervisor/main.go b/cmd/zetaclientd-supervisor/main.go index ee1e247be4..d7179d6948 100644 --- a/cmd/zetaclientd-supervisor/main.go +++ b/cmd/zetaclientd-supervisor/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "cosmossdk.io/errors" "golang.org/x/sync/errgroup" "github.com/zeta-chain/zetacore/app" @@ -69,10 +70,17 @@ func main() { cmd.Stdin = &passwordInputBuffer eg, ctx := errgroup.WithContext(ctx) - eg.Go(cmd.Run) + eg.Go(func() error { + defer cancel() + if err := cmd.Run(); err != nil { + return errors.Wrap(err, "zetaclient process failed") + } + + logger.Info().Msg("zetaclient process exited") + return nil + }) eg.Go(func() error { supervisor.WaitForReloadSignal(ctx) - cancel() return nil }) eg.Go(func() error { diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 281043cb27..13c23b2bac 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -26,6 +26,7 @@ import ( "github.com/zeta-chain/zetacore/zetaclient/chains/base" "github.com/zeta-chain/zetacore/zetaclient/config" zctx "github.com/zeta-chain/zetacore/zetaclient/context" + "github.com/zeta-chain/zetacore/zetaclient/maintenance" "github.com/zeta-chain/zetacore/zetaclient/metrics" "github.com/zeta-chain/zetacore/zetaclient/orchestrator" mc "github.com/zeta-chain/zetacore/zetaclient/tss" @@ -206,6 +207,16 @@ func start(_ *cobra.Command, _ []string) error { // Set P2P ID for telemetry telemetryServer.SetP2PID(server.GetLocalPeerID()) + // Creating a channel to listen for os signals (or other signals) + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT, syscall.SIGTERM) + + // Maintenance workers ============ + maintenance.NewTSSListener(zetacoreClient, masterLogger).Listen(ctx, func() { + masterLogger.Info().Msg("TSS listener received an action to shutdown zetaclientd.") + signalChannel <- syscall.SIGTERM + }) + // Generate a new TSS if keygen is set and add it into the tss server // If TSS has already been generated, and keygen was successful ; we use the existing TSS err = GenerateTSS(ctx, masterLogger, zetacoreClient, server) @@ -257,7 +268,7 @@ func start(_ *cobra.Command, _ []string) error { // Update Current TSS value from zetacore, if TSS keygen is successful, the TSS address is set on zeta-core // Returns err if the RPC call fails as zeta client needs the current TSS address to be set // This is only needed in case of a new Keygen , as the TSS address is set on zetacore only after the keygen is successful i.e enough votes have been broadcast - currentTss, err := zetacoreClient.GetCurrentTSS(ctx) + currentTss, err := zetacoreClient.GetTSS(ctx) if err != nil { startLogger.Error().Err(err).Msg("GetCurrentTSS error") return err @@ -350,11 +361,10 @@ func start(_ *cobra.Command, _ []string) error { // defer zetaSupplyChecker.Stop() //} - startLogger.Info().Msgf("awaiting the os.Interrupt, syscall.SIGTERM signals...") - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - sig := <-ch - startLogger.Info().Msgf("stop signal received: %s", sig) + startLogger.Info().Msgf("Zetaclientd is running") + + sig := <-signalChannel + startLogger.Info().Msgf("Stop signal received: %q", sig) zetacoreClient.Stop() diff --git a/contrib/localnet/docker-compose.yml b/contrib/localnet/docker-compose.yml index 58f8346f82..606b4db4be 100644 --- a/contrib/localnet/docker-compose.yml +++ b/contrib/localnet/docker-compose.yml @@ -126,6 +126,7 @@ services: - ETHDEV_ENDPOINT=http://eth:8545 - HOTKEY_BACKEND=file - HOTKEY_PASSWORD=password # test purposes only + restart: always volumes: - ssh:/root/.ssh - preparams:/root/preparams @@ -142,6 +143,7 @@ services: - ETHDEV_ENDPOINT=http://eth:8545 - HOTKEY_BACKEND=file - HOTKEY_PASSWORD=password # test purposes only + restart: always volumes: - ssh:/root/.ssh - preparams:/root/preparams diff --git a/pkg/bg/bg.go b/pkg/bg/bg.go index 3fd6a74fc3..791c00787a 100644 --- a/pkg/bg/bg.go +++ b/pkg/bg/bg.go @@ -10,8 +10,9 @@ import ( ) type config struct { - name string - logger zerolog.Logger + name string + logger zerolog.Logger + onComplete func() } type Opt func(*config) @@ -20,6 +21,12 @@ func WithName(name string) Opt { return func(cfg *config) { cfg.name = name } } +// OnComplete is a callback function that is called +// when the background task is completed without an error +func OnComplete(fn func()) Opt { + return func(cfg *config) { cfg.onComplete = fn } +} + func WithLogger(logger zerolog.Logger) Opt { return func(cfg *config) { cfg.logger = logger } } @@ -27,8 +34,9 @@ func WithLogger(logger zerolog.Logger) Opt { // Work emits a new task in the background func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { cfg := config{ - name: "", - logger: zerolog.Nop(), + name: "", + logger: zerolog.Nop(), + onComplete: nil, } for _, opt := range opts { @@ -45,10 +53,25 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) { if err := f(ctx); err != nil { logError(err, cfg, false) + return + } + + if cfg.onComplete != nil { + cfg.onComplete() } + + cfg.logger.Info().Str("worker.name", cfg.getName()).Msg("Background task completed") }() } +func (c config) getName() string { + if c.name != "" { + return c.name + } + + return "unknown" +} + func logError(err error, cfg config, isPanic bool) { if err == nil { return @@ -71,10 +94,5 @@ func logError(err error, cfg config, isPanic bool) { evt.Bytes("stack_trace", buf) } - name := cfg.name - if name == "" { - name = "unknown" - } - - evt.Str("worker.name", name).Msg("Background task failed") + evt.Str("worker.name", cfg.getName()).Msg("Background task failed") } diff --git a/pkg/bg/bg_test.go b/pkg/bg/bg_test.go index 6bbcffd003..bba8180f16 100644 --- a/pkg/bg/bg_test.go +++ b/pkg/bg/bg_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync/atomic" "testing" "time" @@ -53,6 +54,35 @@ func TestWork(t *testing.T) { assert.JSONEq(t, expected, out.String()) }) + t.Run("with name and logger and onComplete", func(t *testing.T) { + // ARRANGE + // Given a logger + out := &bytes.Buffer{} + logger := zerolog.New(out) + check := int64(0) + + // And a call returning an error + call := func(ctx context.Context) error { + time.Sleep(100 * time.Millisecond) + return nil + } + + complete := func() { + atomic.AddInt64(&check, 1) + } + + // ACT + Work(ctx, call, WithName("hello"), WithLogger(logger), OnComplete(complete)) + time.Sleep(200 * time.Millisecond) + + // Check the log output + const expected = `{"level":"info", "message":"Background task completed", "worker.name":"hello"}` + assert.JSONEq(t, expected, out.String()) + + // Check onComplete + assert.Equal(t, int64(1), check) + }) + t.Run("panic recovery", func(t *testing.T) { // ARRANGE // Given a logger diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 291ceafe23..d3001691d6 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -46,6 +46,11 @@ func DefaultBackoff() Backoff { return backoff.WithMaxRetries(bo, 5) } +func DefaultConstantBackoff() Backoff { + bo := backoff.NewConstantBackOff(5 * time.Second) + return backoff.WithMaxRetries(bo, 10) +} + // Do executes the callback function with the default backoff config. // It will retry a callback ONLY if error is retryable. func Do(cb Callback) error { diff --git a/zetaclient/chains/base/observer.go b/zetaclient/chains/base/observer.go index 428946f0bf..6021b9219e 100644 --- a/zetaclient/chains/base/observer.go +++ b/zetaclient/chains/base/observer.go @@ -268,8 +268,14 @@ func (ob *Observer) WithHeaderCache(cache *lru.Cache) *Observer { } // OutboundID returns a unique identifier for the outbound transaction. +// The identifier is now used as the key for maps that store outbound related data (e.g. transaction, receipt, etc). func (ob *Observer) OutboundID(nonce uint64) string { - return fmt.Sprintf("%d-%d", ob.chain.ChainId, nonce) + // all chains uses EVM address as part of the key except bitcoin + tssAddress := ob.tss.EVMAddress().String() + if ob.chain.Consensus == chains.Consensus_bitcoin { + tssAddress = ob.tss.BTCAddress() + } + return fmt.Sprintf("%d-%s-%d", ob.chain.ChainId, tssAddress, nonce) } // DB returns the database for the observer. diff --git a/zetaclient/chains/base/observer_test.go b/zetaclient/chains/base/observer_test.go index b40802c0a7..50148a551b 100644 --- a/zetaclient/chains/base/observer_test.go +++ b/zetaclient/chains/base/observer_test.go @@ -2,6 +2,7 @@ package base_test import ( "context" + "fmt" "os" "testing" @@ -248,6 +249,47 @@ func TestObserverGetterAndSetter(t *testing.T) { }) } +func TestOutboundID(t *testing.T) { + tests := []struct { + name string + chain chains.Chain + tss interfaces.TSSSigner + nonce uint64 + }{ + { + name: "should get correct outbound id for Ethereum chain", + chain: chains.Ethereum, + tss: mocks.NewTSSMainnet(), + nonce: 100, + }, + { + name: "should get correct outbound id for Bitcoin chain", + chain: chains.BitcoinMainnet, + tss: mocks.NewTSSMainnet(), + nonce: 200, + }, + } + + // run tests + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // create observer + ob := createObserver(t, tt.chain) + ob = ob.WithTSS(tt.tss) + + // get outbound id + outboundID := ob.OutboundID(tt.nonce) + + // expected outbound id + exepctedID := fmt.Sprintf("%d-%s-%d", tt.chain.ChainId, tt.tss.EVMAddress(), tt.nonce) + if tt.chain.Consensus == chains.Consensus_bitcoin { + exepctedID = fmt.Sprintf("%d-%s-%d", tt.chain.ChainId, tt.tss.BTCAddress(), tt.nonce) + } + require.Equal(t, exepctedID, outboundID) + }) + } +} + func TestLoadLastBlockScanned(t *testing.T) { chain := chains.Ethereum envvar := base.EnvVarLatestBlockByChain(chain) diff --git a/zetaclient/chains/bitcoin/observer/outbound.go b/zetaclient/chains/bitcoin/observer/outbound.go index 009a49759e..55b56f7a83 100644 --- a/zetaclient/chains/bitcoin/observer/outbound.go +++ b/zetaclient/chains/bitcoin/observer/outbound.go @@ -151,6 +151,7 @@ func (ob *Observer) VoteOutboundIfConfirmed( // prevents double spending of same UTXO. However, for nonce 0, we don't have a prior nonce (e.g., -1) // for the signer to check against when making the payment. Signer treats nonce 0 as a special case in downstream code. if nonce == 0 { + ob.logger.Outbound.Info().Msgf("VoteOutboundIfConfirmed: outbound %s is nonce 0", outboundID) return false, nil } diff --git a/zetaclient/chains/interfaces/interfaces.go b/zetaclient/chains/interfaces/interfaces.go index 8377d2ba33..a11af2fc73 100644 --- a/zetaclient/chains/interfaces/interfaces.go +++ b/zetaclient/chains/interfaces/interfaces.go @@ -107,6 +107,8 @@ type ZetacoreClient interface { GetKeys() keyinterfaces.ObserverKeys GetKeyGen(ctx context.Context) (observertypes.Keygen, error) + GetTSS(ctx context.Context) (observertypes.TSS, error) + GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error) GetBlockHeight(ctx context.Context) (int64, error) GetBlockHeaderChainState(ctx context.Context, chainID int64) (*lightclienttypes.ChainState, error) diff --git a/zetaclient/maintenance/tss_listener.go b/zetaclient/maintenance/tss_listener.go new file mode 100644 index 0000000000..e42ead57c4 --- /dev/null +++ b/zetaclient/maintenance/tss_listener.go @@ -0,0 +1,175 @@ +// Package maintenance provides maintenance functionalities for the zetaclient. +package maintenance + +import ( + "context" + "time" + + "cosmossdk.io/errors" + "github.com/rs/zerolog" + + "github.com/zeta-chain/zetacore/pkg/bg" + "github.com/zeta-chain/zetacore/pkg/retry" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" + "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" +) + +const tssListenerTicker = 5 * time.Second + +// TSSListener is a struct that listens for TSS updates, new keygen, and new TSS key generation. +type TSSListener struct { + client interfaces.ZetacoreClient + logger zerolog.Logger +} + +// NewTSSListener creates a new TSSListener. +func NewTSSListener(client interfaces.ZetacoreClient, logger zerolog.Logger) *TSSListener { + log := logger.With().Str("module", "tss_listener").Logger() + + return &TSSListener{ + client: client, + logger: log, + } +} + +// Listen listens for any maintenance regarding TSS and calls action specified. Works in the background. +func (tl *TSSListener) Listen(ctx context.Context, action func()) { + var ( + withLogger = bg.WithLogger(tl.logger) + onComplete = bg.OnComplete(action) + ) + + bg.Work(ctx, tl.waitForUpdate, bg.WithName("tss.wait_for_update"), withLogger, onComplete) + bg.Work(ctx, tl.waitForNewKeyGeneration, bg.WithName("tss.wait_for_generation"), withLogger, onComplete) + bg.Work(ctx, tl.waitForNewKeygen, bg.WithName("tss.wait_for_keygen"), withLogger, onComplete) +} + +// waitForUpdate listens for TSS updates. Returns `nil` when the TSS address is updated +func (tl *TSSListener) waitForUpdate(ctx context.Context) error { + // Initial TSS retrieval + tss, err := retry.DoTypedWithBackoffAndRetry( + func() (observertypes.TSS, error) { return tl.client.GetTSS(ctx) }, + retry.DefaultConstantBackoff(), + ) + + if err != nil { + return errors.Wrap(err, "unable to get initial tss") + } + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tssNew, err := tl.client.GetTSS(ctx) + if err != nil { + tl.logger.Warn().Err(err).Msg("unable to get new tss") + continue + } + // If the TSS address is not updated, continue loop + if tssNew.TssPubkey == tss.TssPubkey { + continue + } + + tl.logger.Info(). + Str("tss.old", tss.TssPubkey). + Str("tss.new", tssNew.TssPubkey). + Msgf("TSS address is updated") + + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForTSSUpdate stopped") + return nil + } + } +} + +// waitForNewKeyGeneration waits for new TSS key generation; it returns when a new key is generated +// It uses the length of the TSS list to determine if a new key is generated +func (tl *TSSListener) waitForNewKeyGeneration(ctx context.Context) error { + // Initial TSS history retrieval + tssHistoricalList, err := retry.DoTypedWithBackoffAndRetry( + func() ([]observertypes.TSS, error) { return tl.client.GetTSSHistory(ctx) }, + retry.DefaultConstantBackoff(), + ) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + + tssLen := len(tssHistoricalList) + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + tssHistoricalListNew, err := tl.client.GetTSSHistory(ctx) + if err != nil { + continue + } + + tssLenUpdated := len(tssHistoricalListNew) + // New tss key has not been added to list , continue loop + if tssLenUpdated <= tssLen { + continue + } + + tl.logger.Info(). + Int("tssLen", tssLen). + Int("tssLenUpdated", tssLenUpdated). + Msg("tss list updated") + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForNewKeyGeneration stopped") + return nil + } + } +} + +// waitForNewKeygen is a background thread that listens for new keygen; it returns when a new keygen is set +func (tl *TSSListener) waitForNewKeygen(ctx context.Context) error { + // Initial Keygen retrieval + keygen, err := tl.client.GetKeyGen(ctx) + if err != nil { + return errors.Wrap(err, "failed to get initial tss history") + } + + ticker := time.NewTicker(tssListenerTicker) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + keygenUpdated, err := tl.client.GetKeyGen(ctx) + switch { + case err != nil: + tl.logger.Warn().Err(err).Msg("unable to get keygen") + continue + // Keygen is not pending it has already been successfully generated, continue loop + case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenSuccess: + continue + // Keygen failed we to need to wait until a new keygen is set, continue loop + case keygenUpdated.Status == observertypes.KeygenStatus_KeyGenFailed: + continue + // Keygen is pending but block number is not updated, continue loop. + // Most likely the zetaclient is waiting for the keygen block to arrive. + case keygenUpdated.Status == observertypes.KeygenStatus_PendingKeygen && keygenUpdated.BlockNumber <= keygen.BlockNumber: + continue + } + + // Trigger restart only when the following conditions are met: + // 1. Keygen is pending + // 2. Block number is updated + + tl.logger.Info(). + Int64("blockNumber", keygenUpdated.BlockNumber). + Msg("got new keygen") + return nil + case <-ctx.Done(): + tl.logger.Info().Msg("waitForNewKeygen stopped") + return nil + } + } +} diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index 8bff065099..36e1046b2e 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -137,9 +137,6 @@ func (oc *Orchestrator) Start(ctx context.Context) error { shutdownOrchestrator := func() { // now stop orchestrator and all observers close(oc.stop) - for _, c := range oc.observerMap { - c.Stop() - } } oc.zetacoreClient.OnBeforeStop(shutdownOrchestrator) diff --git a/zetaclient/testutils/mocks/zetacore_client.go b/zetaclient/testutils/mocks/zetacore_client.go index 168b580ada..5efe76aeba 100644 --- a/zetaclient/testutils/mocks/zetacore_client.go +++ b/zetaclient/testutils/mocks/zetacore_client.go @@ -496,6 +496,64 @@ func (_m *ZetacoreClient) GetRateLimiterInput(ctx context.Context, window int64) return r0, r1 } +// GetTSS provides a mock function with given fields: ctx +func (_m *ZetacoreClient) GetTSS(ctx context.Context) (observertypes.TSS, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetTSS") + } + + var r0 observertypes.TSS + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (observertypes.TSS, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) observertypes.TSS); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(observertypes.TSS) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTSSHistory provides a mock function with given fields: ctx +func (_m *ZetacoreClient) GetTSSHistory(ctx context.Context) ([]observertypes.TSS, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetTSSHistory") + } + + var r0 []observertypes.TSS + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]observertypes.TSS, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []observertypes.TSS); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]observertypes.TSS) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetZetaHotKeyBalance provides a mock function with given fields: ctx func (_m *ZetacoreClient) GetZetaHotKeyBalance(ctx context.Context) (math.Int, error) { ret := _m.Called(ctx) diff --git a/zetaclient/zetacore/client.go b/zetaclient/zetacore/client.go index 0806c709fe..c7149dd95f 100644 --- a/zetaclient/zetacore/client.go +++ b/zetaclient/zetacore/client.go @@ -391,7 +391,7 @@ func (c *Client) UpdateAppContext(ctx context.Context, appContext *zctx.AppConte return errors.Wrap(err, "unable to fetch crosschain flags from zetacore") } - tss, err := c.GetCurrentTSS(ctx) + tss, err := c.GetTSS(ctx) if err != nil { return errors.Wrap(err, "unable to fetch current TSS") } diff --git a/zetaclient/zetacore/client_query_observer.go b/zetaclient/zetacore/client_query_observer.go index 4d98ca848e..a8ed20c998 100644 --- a/zetaclient/zetacore/client_query_observer.go +++ b/zetaclient/zetacore/client_query_observer.go @@ -140,17 +140,7 @@ func (c *Client) GetBallot( return resp, nil } -// GetCurrentTSS returns the current TSS -func (c *Client) GetCurrentTSS(ctx context.Context) (types.TSS, error) { - resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{}) - if err != nil { - return types.TSS{}, errors.Wrap(err, "failed to get current tss") - } - - return resp.TSS, nil -} - -// GetEVMTSSAddress returns the EVM TSS address. +// GetEVMTSSAddress returns the current EVM TSS address. func (c *Client) GetEVMTSSAddress(ctx context.Context) (string, error) { resp, err := c.client.observer.GetTssAddress(ctx, &types.QueryGetTssAddressRequest{}) if err != nil { @@ -160,7 +150,7 @@ func (c *Client) GetEVMTSSAddress(ctx context.Context) (string, error) { return resp.Eth, nil } -// GetBTCTSSAddress returns the BTC TSS address +// GetBTCTSSAddress returns the current BTC TSS address func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, error) { in := &types.QueryGetTssAddressRequest{BitcoinChainId: chainID} @@ -171,7 +161,16 @@ func (c *Client) GetBTCTSSAddress(ctx context.Context, chainID int64) (string, e return resp.Btc, nil } -// GetTSSHistory returns the TSS history +// GetTSS returns the current TSS +func (c *Client) GetTSS(ctx context.Context) (types.TSS, error) { + resp, err := c.client.observer.TSS(ctx, &types.QueryGetTSSRequest{}) + if err != nil { + return types.TSS{}, errors.Wrap(err, "failed to get tss") + } + return resp.TSS, nil +} + +// GetTSSHistory returns the historical list of TSS func (c *Client) GetTSSHistory(ctx context.Context) ([]types.TSS, error) { resp, err := c.client.observer.TssHistory(ctx, &types.QueryTssHistoryRequest{}) if err != nil { diff --git a/zetaclient/zetacore/client_query_test.go b/zetaclient/zetacore/client_query_test.go index ae995b069d..369e1bda3d 100644 --- a/zetaclient/zetacore/client_query_test.go +++ b/zetaclient/zetacore/client_query_test.go @@ -670,7 +670,7 @@ func TestZetacore_GetInboundTrackersForChain(t *testing.T) { require.Equal(t, expectedOutput.InboundTracker, resp) } -func TestZetacore_GetCurrentTss(t *testing.T) { +func TestZetacore_GetTss(t *testing.T) { ctx := context.Background() expectedOutput := observertypes.QueryGetTSSResponse{ @@ -688,7 +688,7 @@ func TestZetacore_GetCurrentTss(t *testing.T) { client := setupZetacoreClient(t, withDefaultObserverKeys()) - resp, err := client.GetCurrentTSS(ctx) + resp, err := client.GetTSS(ctx) require.NoError(t, err) require.Equal(t, expectedOutput.TSS, resp) }