diff --git a/cmd/zetaclientd/start.go b/cmd/zetaclientd/start.go index 6a4d0b6f4d..d2f4382308 100644 --- a/cmd/zetaclientd/start.go +++ b/cmd/zetaclientd/start.go @@ -77,7 +77,7 @@ func start(_ *cobra.Command, _ []string) error { } masterLogger := logger.Std - startLogger := masterLogger.With().Str("module", "startup").Logger() + startLogger := logger.Std.With().Str("module", "startup").Logger() appContext := zctx.New(cfg, masterLogger) ctx := zctx.WithAppContext(context.Background(), appContext) @@ -295,7 +295,9 @@ func start(_ *cobra.Command, _ []string) error { zetacoreClient, signerMap, observerMap, - masterLogger, + tss, + dbpath, + logger, telemetryServer, ) if err != nil { diff --git a/zetaclient/orchestrator/orchestrator.go b/zetaclient/orchestrator/orchestrator.go index b0965a9d1b..30c7217bd4 100644 --- a/zetaclient/orchestrator/orchestrator.go +++ b/zetaclient/orchestrator/orchestrator.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "math" + "sync" "time" sdkmath "cosmossdk.io/math" @@ -17,6 +18,7 @@ import ( zetamath "github.com/zeta-chain/zetacore/pkg/math" "github.com/zeta-chain/zetacore/x/crosschain/types" observertypes "github.com/zeta-chain/zetacore/x/observer/types" + "github.com/zeta-chain/zetacore/zetaclient/chains/base" btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer" "github.com/zeta-chain/zetacore/zetaclient/chains/interfaces" zctx "github.com/zeta-chain/zetacore/zetaclient/context" @@ -55,10 +57,16 @@ type Orchestrator struct { // last operator balance lastOperatorBalance sdkmath.Int + // observer & signer props + tss interfaces.TSSSigner + dbDirectory string + baseLogger base.Logger + // misc logger multiLogger ts *metrics.TelemetryServer stop chan struct{} + mu sync.RWMutex } type multiLogger struct { @@ -72,7 +80,9 @@ func New( client interfaces.ZetacoreClient, signerMap map[int64]interfaces.ChainSigner, observerMap map[int64]interfaces.ChainObserver, - logger zerolog.Logger, + tss interfaces.TSSSigner, + dbDirectory string, + logger base.Logger, ts *metrics.TelemetryServer, ) (*Orchestrator, error) { if signerMap == nil || observerMap == nil { @@ -80,8 +90,8 @@ func New( } log := multiLogger{ - Logger: logger.With().Str("module", "orchestrator").Logger(), - Sampled: logger.With().Str("module", "orchestrator").Logger().Sample(defaultLogSampler), + Logger: logger.Std.With().Str("module", "orchestrator").Logger(), + Sampled: logger.Std.With().Str("module", "orchestrator").Logger().Sample(defaultLogSampler), } balance, err := client.GetZetaHotKeyBalance(ctx) @@ -95,9 +105,14 @@ func New( signerMap: signerMap, observerMap: observerMap, - outboundProc: outboundprocessor.NewProcessor(logger), + outboundProc: outboundprocessor.NewProcessor(logger.Std), lastOperatorBalance: balance, + // observer & signer props + tss: tss, + dbDirectory: dbDirectory, + baseLogger: logger, + logger: log, ts: ts, stop: make(chan struct{}), @@ -113,12 +128,9 @@ func (oc *Orchestrator) Start(ctx context.Context) error { oc.logger.Info().Str("signer", signerAddress.String()).Msg("Starting orchestrator") - for _, observer := range oc.observerMap { - observer.Start(ctx) - } - // start cctx scheduler - bg.Work(ctx, oc.StartCctxScheduler, bg.WithName("StartCctxScheduler"), bg.WithLogger(oc.logger.Logger)) + bg.Work(ctx, oc.runScheduler, bg.WithName("runScheduler"), bg.WithLogger(oc.logger.Logger)) + bg.Work(ctx, oc.runObserverSignerSync, bg.WithName("runObserverSignerSync"), bg.WithLogger(oc.logger.Logger)) shutdownOrchestrator := func() { // now stop orchestrator and all observers @@ -133,67 +145,97 @@ func (oc *Orchestrator) Start(ctx context.Context) error { return nil } -// GetUpdatedSigner returns signer with updated chain parameters -func (oc *Orchestrator) GetUpdatedSigner( - appContext *zctx.AppContext, - chainID int64, -) (interfaces.ChainSigner, error) { - signer, found := oc.signerMap[chainID] +// returns signer with updated chain parameters. +func (oc *Orchestrator) resolveSigner(app *zctx.AppContext, chainID int64) (interfaces.ChainSigner, error) { + signer, err := oc.getSigner(chainID) + if err != nil { + return nil, err + } + + // noop for non-EVM chains + if !chains.IsEVMChain(chainID, app.GetAdditionalChains()) { + return signer, nil + } + + evmParams, found := app.GetEVMChainParams(chainID) if !found { - return nil, fmt.Errorf("signer not found for chainID %d", chainID) + return signer, nil } - // update EVM signer parameters only. BTC signer doesn't use chain parameters for now. - if chains.IsEVMChain(chainID, appContext.GetAdditionalChains()) { - evmParams, found := appContext.GetEVMChainParams(chainID) - if found { - // update zeta connector and ERC20 custody addresses - zetaConnectorAddress := ethcommon.HexToAddress(evmParams.GetConnectorContractAddress()) - erc20CustodyAddress := ethcommon.HexToAddress(evmParams.GetErc20CustodyContractAddress()) - if zetaConnectorAddress != signer.GetZetaConnectorAddress() { - signer.SetZetaConnectorAddress(zetaConnectorAddress) - oc.logger.Info().Msgf( - "updated zeta connector address for chainID %d, new address: %s", chainID, zetaConnectorAddress) - } - if erc20CustodyAddress != signer.GetERC20CustodyAddress() { - signer.SetERC20CustodyAddress(erc20CustodyAddress) - oc.logger.Info().Msgf( - "updated ERC20 custody address for chainID %d, new address: %s", chainID, erc20CustodyAddress) - } - } + + // update zeta connector and ERC20 custody addresses + zetaConnectorAddress := ethcommon.HexToAddress(evmParams.GetConnectorContractAddress()) + if zetaConnectorAddress != signer.GetZetaConnectorAddress() { + signer.SetZetaConnectorAddress(zetaConnectorAddress) + oc.logger.Info(). + Str("signer.connector_address", zetaConnectorAddress.String()). + Msgf("updated zeta connector address for chain %d", chainID) } + + erc20CustodyAddress := ethcommon.HexToAddress(evmParams.GetErc20CustodyContractAddress()) + if erc20CustodyAddress != signer.GetERC20CustodyAddress() { + signer.SetERC20CustodyAddress(erc20CustodyAddress) + oc.logger.Info(). + Str("signer.erc20_custody", erc20CustodyAddress.String()). + Msgf("updated zeta connector address for chain %d", chainID) + } + return signer, nil } -// GetUpdatedChainObserver returns chain observer with updated chain parameters -func (oc *Orchestrator) GetUpdatedChainObserver( - appContext *zctx.AppContext, - chainID int64, -) (interfaces.ChainObserver, error) { - observer, found := oc.observerMap[chainID] +func (oc *Orchestrator) getSigner(chainID int64) (interfaces.ChainSigner, error) { + oc.mu.RLock() + defer oc.mu.RUnlock() + + s, found := oc.signerMap[chainID] if !found { - return nil, fmt.Errorf("chain observer not found for chainID %d", chainID) + return nil, fmt.Errorf("signer not found for chainID %d", chainID) + } + + return s, nil +} + +// returns chain observer with updated chain parameters +func (oc *Orchestrator) resolveObserver(app *zctx.AppContext, chainID int64) (interfaces.ChainObserver, error) { + observer, err := oc.getObserver(chainID) + if err != nil { + return nil, err } + // update chain observer chain parameters curParams := observer.GetChainParams() - if chains.IsEVMChain(chainID, appContext.GetAdditionalChains()) { - evmParams, found := appContext.GetEVMChainParams(chainID) + if chains.IsEVMChain(chainID, app.GetAdditionalChains()) { + evmParams, found := app.GetEVMChainParams(chainID) if found && !observertypes.ChainParamsEqual(curParams, *evmParams) { observer.SetChainParams(*evmParams) - oc.logger.Info().Msgf( - "updated chain params for chainID %d, new params: %v", chainID, *evmParams) + oc.logger.Info(). + Interface("observer.chain_params", *evmParams). + Msgf("updated chain params for EVM chainID %d", chainID) } - } else if chains.IsBitcoinChain(chainID, appContext.GetAdditionalChains()) { - _, btcParams, found := appContext.GetBTCChainParams() - + } else if chains.IsBitcoinChain(chainID, app.GetAdditionalChains()) { + _, btcParams, found := app.GetBTCChainParams() if found && !observertypes.ChainParamsEqual(curParams, *btcParams) { observer.SetChainParams(*btcParams) - oc.logger.Info().Msgf( - "updated chain params for Bitcoin, new params: %v", *btcParams) + oc.logger.Info(). + Interface("observer.chain_params", *btcParams). + Msgf("updated chain params for UTXO chainID %d", btcParams.ChainId) } } + return observer, nil } +func (oc *Orchestrator) getObserver(chainID int64) (interfaces.ChainObserver, error) { + oc.mu.RLock() + defer oc.mu.RUnlock() + + ob, found := oc.observerMap[chainID] + if !found { + return nil, fmt.Errorf("observer not found for chainID %d", chainID) + } + + return ob, nil +} + // GetPendingCctxsWithinRateLimit get pending cctxs across foreign chains within rate limit func (oc *Orchestrator) GetPendingCctxsWithinRateLimit( ctx context.Context, @@ -245,9 +287,9 @@ func (oc *Orchestrator) GetPendingCctxsWithinRateLimit( return output.CctxsMap, nil } -// StartCctxScheduler schedules keysigns for cctxs on each ZetaChain block (the ticker) +// schedules keysigns for cctxs on each ZetaChain block (the ticker) // TODO(revamp): make this function simpler -func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { +func (oc *Orchestrator) runScheduler(ctx context.Context) error { app, err := zctx.FromContext(ctx) if err != nil { return err @@ -258,7 +300,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { for { select { case <-oc.stop: - oc.logger.Warn().Msg("StartCctxScheduler: stopped") + oc.logger.Warn().Msg("runScheduler: stopped") return nil case <-observeTicker.C: { @@ -268,7 +310,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { continue } if bn < 0 { - oc.logger.Error().Msg("StartCctxScheduler: GetBlockHeight returned negative height") + oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height") continue } if lastBlockNum == 0 { @@ -277,7 +319,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { if bn > lastBlockNum { // we have a new block bn = lastBlockNum + 1 if bn%10 == 0 { - oc.logger.Debug().Msgf("StartCctxScheduler: zetacore heart beat: %d", bn) + oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn) } balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx) @@ -300,7 +342,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { // query pending cctxs across all external chains within rate limit cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, externalChains) if err != nil { - oc.logger.Error().Err(err).Msgf("StartCctxScheduler: GetPendingCctxsWithinRatelimit failed") + oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed") } // schedule keysign for pending cctxs on each chain @@ -313,18 +355,16 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error { } // update chain parameters for signer and chain observer - signer, err := oc.GetUpdatedSigner(app, c.ChainId) + signer, err := oc.resolveSigner(app, c.ChainId) if err != nil { - oc.logger.Error(). - Err(err). - Msgf("StartCctxScheduler: GetUpdatedSigner failed for chain %d", c.ChainId) + oc.logger.Error().Err(err). + Msgf("runScheduler: unable to resolve signer for chain %d", c.ChainId) continue } - ob, err := oc.GetUpdatedChainObserver(app, c.ChainId) + ob, err := oc.resolveObserver(app, c.ChainId) if err != nil { - oc.logger.Error(). - Err(err). - Msgf("StartCctxScheduler: GetUpdatedChainObserver failed for chain %d", c.ChainId) + oc.logger.Error().Err(err). + Msgf("runScheduler: resolveObserver failed for chain %d", c.ChainId) continue } if !app.IsOutboundObservationEnabled(ob.GetChainParams()) { @@ -525,3 +565,58 @@ func (oc *Orchestrator) ScheduleCctxBTC( } } } + +func (oc *Orchestrator) runObserverSignerSync(ctx context.Context) error { + const cadence = 5 * time.Second + + ticker := time.NewTicker(cadence) + defer ticker.Stop() + + for { + select { + case <-oc.stop: + oc.logger.Warn().Msg("runObserverSignerSync: stopped") + return nil + case <-ticker.C: + if err := oc.syncObserverSigner(ctx); err != nil { + oc.logger.Error().Err(err).Msg("runObserverSignerSync: syncObserverSigner failed") + } + } + } +} + +// syncs and provisions observers & signers. +// Note that zctx.AppContext Update is a responsibility of another component +// See zetacore.Client{}.UpdateZetacoreContextWorker +func (oc *Orchestrator) syncObserverSigner(ctx context.Context) error { + oc.mu.Lock() + defer oc.mu.Unlock() + + client := oc.zetacoreClient + + added, removed, err := syncObserverMap(ctx, client, oc.tss, oc.dbDirectory, oc.baseLogger, oc.ts, &oc.observerMap) + if err != nil { + return errors.Wrap(err, "syncObserverMap failed") + } + + if added+removed > 0 { + oc.logger.Info(). + Int("observer.added", added). + Int("observer.removed", removed). + Msg("synced observers") + } + + added, removed, err = syncSignerMap(ctx, oc.tss, oc.baseLogger, oc.ts, &oc.signerMap) + if err != nil { + return errors.Wrap(err, "syncSignerMap failed") + } + + if added+removed > 0 { + oc.logger.Info(). + Int("signers.added", added). + Int("signers.removed", removed). + Msg("synced signers") + } + + return nil +} diff --git a/zetaclient/orchestrator/orchestrator_test.go b/zetaclient/orchestrator/orchestrator_test.go index 8637834a6e..2a0ba0ade7 100644 --- a/zetaclient/orchestrator/orchestrator_test.go +++ b/zetaclient/orchestrator/orchestrator_test.go @@ -111,14 +111,14 @@ func Test_GetUpdatedSigner(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) context := CreateAppContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) // BSC signer should not be found - _, err := orchestrator.GetUpdatedSigner(context, chains.BscMainnet.ChainId) + _, err := orchestrator.resolveSigner(context, chains.BscMainnet.ChainId) require.ErrorContains(t, err, "signer not found") }) t.Run("should be able to update connector and erc20 custody address", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) context := CreateAppContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) // update signer with new connector and erc20 custody address - signer, err := orchestrator.GetUpdatedSigner(context, evmChain.ChainId) + signer, err := orchestrator.resolveSigner(context, evmChain.ChainId) require.NoError(t, err) require.Equal(t, testutils.OtherAddress1, signer.GetZetaConnectorAddress().Hex()) require.Equal(t, testutils.OtherAddress2, signer.GetERC20CustodyAddress().Hex()) @@ -176,14 +176,14 @@ func Test_GetUpdatedChainObserver(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateAppContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) // BSC chain observer should not be found - _, err := orchestrator.GetUpdatedChainObserver(coreContext, chains.BscMainnet.ChainId) - require.ErrorContains(t, err, "chain observer not found") + _, err := orchestrator.resolveObserver(coreContext, chains.BscMainnet.ChainId) + require.ErrorContains(t, err, "observer not found") }) t.Run("chain params in evm chain observer should be updated successfully", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateAppContext(evmChain, btcChain, evmChainParamsNew, btcChainParams) // update evm chain observer with new chain params - chainOb, err := orchestrator.GetUpdatedChainObserver(coreContext, evmChain.ChainId) + chainOb, err := orchestrator.resolveObserver(coreContext, evmChain.ChainId) require.NoError(t, err) require.NotNil(t, chainOb) require.True(t, observertypes.ChainParamsEqual(*evmChainParamsNew, chainOb.GetChainParams())) @@ -192,14 +192,14 @@ func Test_GetUpdatedChainObserver(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateAppContext(btcChain, btcChain, evmChainParams, btcChainParamsNew) // BTC testnet chain observer should not be found - _, err := orchestrator.GetUpdatedChainObserver(coreContext, chains.BitcoinTestnet.ChainId) - require.ErrorContains(t, err, "chain observer not found") + _, err := orchestrator.resolveObserver(coreContext, chains.BitcoinTestnet.ChainId) + require.ErrorContains(t, err, "observer not found") }) t.Run("chain params in btc chain observer should be updated successfully", func(t *testing.T) { orchestrator := MockOrchestrator(t, nil, evmChain, btcChain, evmChainParams, btcChainParams) coreContext := CreateAppContext(btcChain, btcChain, evmChainParams, btcChainParamsNew) // update btc chain observer with new chain params - chainOb, err := orchestrator.GetUpdatedChainObserver(coreContext, btcChain.ChainId) + chainOb, err := orchestrator.resolveObserver(coreContext, btcChain.ChainId) require.NoError(t, err) require.NotNil(t, chainOb) require.True(t, observertypes.ChainParamsEqual(*btcChainParamsNew, chainOb.GetChainParams()))