Skip to content

Commit

Permalink
Implement observer & signer sync worker
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Jul 19, 2024
1 parent f71991c commit 6778164
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 74 deletions.
6 changes: 4 additions & 2 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func start(_ *cobra.Command, _ []string) error {
}

masterLogger := logger.Std
startLogger := masterLogger.With().Str("module", "startup").Logger()
startLogger := logger.Std.With().Str("module", "startup").Logger()

appContext := zctx.New(cfg, masterLogger)
ctx := zctx.WithAppContext(context.Background(), appContext)
Expand Down Expand Up @@ -295,7 +295,9 @@ func start(_ *cobra.Command, _ []string) error {
zetacoreClient,
signerMap,
observerMap,
masterLogger,
tss,
dbpath,
logger,
telemetryServer,
)
if err != nil {
Expand Down
223 changes: 159 additions & 64 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math"
"sync"
"time"

sdkmath "cosmossdk.io/math"
Expand All @@ -17,6 +18,7 @@ import (
zetamath "github.com/zeta-chain/zetacore/pkg/math"
"github.com/zeta-chain/zetacore/x/crosschain/types"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
Expand Down Expand Up @@ -55,10 +57,16 @@ type Orchestrator struct {
// last operator balance
lastOperatorBalance sdkmath.Int

// observer & signer props
tss interfaces.TSSSigner
dbDirectory string
baseLogger base.Logger

// misc
logger multiLogger
ts *metrics.TelemetryServer
stop chan struct{}
mu sync.RWMutex
}

type multiLogger struct {
Expand All @@ -72,16 +80,18 @@ func New(
client interfaces.ZetacoreClient,
signerMap map[int64]interfaces.ChainSigner,
observerMap map[int64]interfaces.ChainObserver,
logger zerolog.Logger,
tss interfaces.TSSSigner,
dbDirectory string,
logger base.Logger,
ts *metrics.TelemetryServer,
) (*Orchestrator, error) {
if signerMap == nil || observerMap == nil {
return nil, errors.New("signerMap or observerMap is nil")
}

log := multiLogger{
Logger: logger.With().Str("module", "orchestrator").Logger(),
Sampled: logger.With().Str("module", "orchestrator").Logger().Sample(defaultLogSampler),
Logger: logger.Std.With().Str("module", "orchestrator").Logger(),
Sampled: logger.Std.With().Str("module", "orchestrator").Logger().Sample(defaultLogSampler),
}

balance, err := client.GetZetaHotKeyBalance(ctx)
Expand All @@ -95,9 +105,14 @@ func New(
signerMap: signerMap,
observerMap: observerMap,

outboundProc: outboundprocessor.NewProcessor(logger),
outboundProc: outboundprocessor.NewProcessor(logger.Std),
lastOperatorBalance: balance,

// observer & signer props
tss: tss,
dbDirectory: dbDirectory,
baseLogger: logger,

logger: log,
ts: ts,
stop: make(chan struct{}),
Expand All @@ -113,12 +128,9 @@ func (oc *Orchestrator) Start(ctx context.Context) error {

oc.logger.Info().Str("signer", signerAddress.String()).Msg("Starting orchestrator")

for _, observer := range oc.observerMap {
observer.Start(ctx)
}

// start cctx scheduler
bg.Work(ctx, oc.StartCctxScheduler, bg.WithName("StartCctxScheduler"), bg.WithLogger(oc.logger.Logger))
bg.Work(ctx, oc.runScheduler, bg.WithName("runScheduler"), bg.WithLogger(oc.logger.Logger))
bg.Work(ctx, oc.runObserverSignerSync, bg.WithName("runObserverSignerSync"), bg.WithLogger(oc.logger.Logger))

shutdownOrchestrator := func() {
// now stop orchestrator and all observers
Expand All @@ -133,67 +145,97 @@ func (oc *Orchestrator) Start(ctx context.Context) error {
return nil
}

// GetUpdatedSigner returns signer with updated chain parameters
func (oc *Orchestrator) GetUpdatedSigner(
appContext *zctx.AppContext,
chainID int64,
) (interfaces.ChainSigner, error) {
signer, found := oc.signerMap[chainID]
// returns signer with updated chain parameters.
func (oc *Orchestrator) resolveSigner(app *zctx.AppContext, chainID int64) (interfaces.ChainSigner, error) {
signer, err := oc.getSigner(chainID)
if err != nil {
return nil, err
}

// noop for non-EVM chains
if !chains.IsEVMChain(chainID, app.GetAdditionalChains()) {
return signer, nil
}

evmParams, found := app.GetEVMChainParams(chainID)
if !found {
return nil, fmt.Errorf("signer not found for chainID %d", chainID)
return signer, nil
}
// update EVM signer parameters only. BTC signer doesn't use chain parameters for now.
if chains.IsEVMChain(chainID, appContext.GetAdditionalChains()) {
evmParams, found := appContext.GetEVMChainParams(chainID)
if found {
// update zeta connector and ERC20 custody addresses
zetaConnectorAddress := ethcommon.HexToAddress(evmParams.GetConnectorContractAddress())
erc20CustodyAddress := ethcommon.HexToAddress(evmParams.GetErc20CustodyContractAddress())
if zetaConnectorAddress != signer.GetZetaConnectorAddress() {
signer.SetZetaConnectorAddress(zetaConnectorAddress)
oc.logger.Info().Msgf(
"updated zeta connector address for chainID %d, new address: %s", chainID, zetaConnectorAddress)
}
if erc20CustodyAddress != signer.GetERC20CustodyAddress() {
signer.SetERC20CustodyAddress(erc20CustodyAddress)
oc.logger.Info().Msgf(
"updated ERC20 custody address for chainID %d, new address: %s", chainID, erc20CustodyAddress)
}
}

// update zeta connector and ERC20 custody addresses
zetaConnectorAddress := ethcommon.HexToAddress(evmParams.GetConnectorContractAddress())
if zetaConnectorAddress != signer.GetZetaConnectorAddress() {
signer.SetZetaConnectorAddress(zetaConnectorAddress)
oc.logger.Info().
Str("signer.connector_address", zetaConnectorAddress.String()).
Msgf("updated zeta connector address for chain %d", chainID)
}

erc20CustodyAddress := ethcommon.HexToAddress(evmParams.GetErc20CustodyContractAddress())
if erc20CustodyAddress != signer.GetERC20CustodyAddress() {
signer.SetERC20CustodyAddress(erc20CustodyAddress)
oc.logger.Info().
Str("signer.erc20_custody", erc20CustodyAddress.String()).
Msgf("updated zeta connector address for chain %d", chainID)
}

return signer, nil
}

// GetUpdatedChainObserver returns chain observer with updated chain parameters
func (oc *Orchestrator) GetUpdatedChainObserver(
appContext *zctx.AppContext,
chainID int64,
) (interfaces.ChainObserver, error) {
observer, found := oc.observerMap[chainID]
func (oc *Orchestrator) getSigner(chainID int64) (interfaces.ChainSigner, error) {
oc.mu.RLock()
defer oc.mu.RUnlock()

s, found := oc.signerMap[chainID]
if !found {
return nil, fmt.Errorf("chain observer not found for chainID %d", chainID)
return nil, fmt.Errorf("signer not found for chainID %d", chainID)
}

return s, nil
}

// returns chain observer with updated chain parameters
func (oc *Orchestrator) resolveObserver(app *zctx.AppContext, chainID int64) (interfaces.ChainObserver, error) {
observer, err := oc.getObserver(chainID)
if err != nil {
return nil, err
}

// update chain observer chain parameters
curParams := observer.GetChainParams()
if chains.IsEVMChain(chainID, appContext.GetAdditionalChains()) {
evmParams, found := appContext.GetEVMChainParams(chainID)
if chains.IsEVMChain(chainID, app.GetAdditionalChains()) {
evmParams, found := app.GetEVMChainParams(chainID)
if found && !observertypes.ChainParamsEqual(curParams, *evmParams) {
observer.SetChainParams(*evmParams)
oc.logger.Info().Msgf(
"updated chain params for chainID %d, new params: %v", chainID, *evmParams)
oc.logger.Info().
Interface("observer.chain_params", *evmParams).
Msgf("updated chain params for EVM chainID %d", chainID)
}
} else if chains.IsBitcoinChain(chainID, appContext.GetAdditionalChains()) {
_, btcParams, found := appContext.GetBTCChainParams()

} else if chains.IsBitcoinChain(chainID, app.GetAdditionalChains()) {
_, btcParams, found := app.GetBTCChainParams()
if found && !observertypes.ChainParamsEqual(curParams, *btcParams) {
observer.SetChainParams(*btcParams)
oc.logger.Info().Msgf(
"updated chain params for Bitcoin, new params: %v", *btcParams)
oc.logger.Info().
Interface("observer.chain_params", *btcParams).
Msgf("updated chain params for UTXO chainID %d", btcParams.ChainId)
}
}

return observer, nil
}

func (oc *Orchestrator) getObserver(chainID int64) (interfaces.ChainObserver, error) {
oc.mu.RLock()
defer oc.mu.RUnlock()

ob, found := oc.observerMap[chainID]
if !found {
return nil, fmt.Errorf("observer not found for chainID %d", chainID)
}

return ob, nil
}

// GetPendingCctxsWithinRateLimit get pending cctxs across foreign chains within rate limit
func (oc *Orchestrator) GetPendingCctxsWithinRateLimit(
ctx context.Context,
Expand Down Expand Up @@ -245,9 +287,9 @@ func (oc *Orchestrator) GetPendingCctxsWithinRateLimit(
return output.CctxsMap, nil
}

// StartCctxScheduler schedules keysigns for cctxs on each ZetaChain block (the ticker)
// schedules keysigns for cctxs on each ZetaChain block (the ticker)
// TODO(revamp): make this function simpler
func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
func (oc *Orchestrator) runScheduler(ctx context.Context) error {
app, err := zctx.FromContext(ctx)
if err != nil {
return err
Expand All @@ -258,7 +300,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("StartCctxScheduler: stopped")
oc.logger.Warn().Msg("runScheduler: stopped")
return nil
case <-observeTicker.C:
{
Expand All @@ -268,7 +310,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
continue
}
if bn < 0 {
oc.logger.Error().Msg("StartCctxScheduler: GetBlockHeight returned negative height")
oc.logger.Error().Msg("runScheduler: GetBlockHeight returned negative height")
continue
}
if lastBlockNum == 0 {
Expand All @@ -277,7 +319,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
if bn > lastBlockNum { // we have a new block
bn = lastBlockNum + 1
if bn%10 == 0 {
oc.logger.Debug().Msgf("StartCctxScheduler: zetacore heart beat: %d", bn)
oc.logger.Debug().Msgf("runScheduler: zetacore heart beat: %d", bn)
}

balance, err := oc.zetacoreClient.GetZetaHotKeyBalance(ctx)
Expand All @@ -300,7 +342,7 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
// query pending cctxs across all external chains within rate limit
cctxMap, err := oc.GetPendingCctxsWithinRateLimit(ctx, externalChains)
if err != nil {
oc.logger.Error().Err(err).Msgf("StartCctxScheduler: GetPendingCctxsWithinRatelimit failed")
oc.logger.Error().Err(err).Msgf("runScheduler: GetPendingCctxsWithinRatelimit failed")
}

// schedule keysign for pending cctxs on each chain
Expand All @@ -313,18 +355,16 @@ func (oc *Orchestrator) StartCctxScheduler(ctx context.Context) error {
}

// update chain parameters for signer and chain observer
signer, err := oc.GetUpdatedSigner(app, c.ChainId)
signer, err := oc.resolveSigner(app, c.ChainId)
if err != nil {
oc.logger.Error().
Err(err).
Msgf("StartCctxScheduler: GetUpdatedSigner failed for chain %d", c.ChainId)
oc.logger.Error().Err(err).
Msgf("runScheduler: unable to resolve signer for chain %d", c.ChainId)
continue
}
ob, err := oc.GetUpdatedChainObserver(app, c.ChainId)
ob, err := oc.resolveObserver(app, c.ChainId)
if err != nil {
oc.logger.Error().
Err(err).
Msgf("StartCctxScheduler: GetUpdatedChainObserver failed for chain %d", c.ChainId)
oc.logger.Error().Err(err).
Msgf("runScheduler: resolveObserver failed for chain %d", c.ChainId)
continue
}
if !app.IsOutboundObservationEnabled(ob.GetChainParams()) {
Expand Down Expand Up @@ -525,3 +565,58 @@ func (oc *Orchestrator) ScheduleCctxBTC(
}
}
}

func (oc *Orchestrator) runObserverSignerSync(ctx context.Context) error {
const cadence = 5 * time.Second

ticker := time.NewTicker(cadence)
defer ticker.Stop()

for {
select {
case <-oc.stop:
oc.logger.Warn().Msg("runObserverSignerSync: stopped")
return nil
case <-ticker.C:
if err := oc.syncObserverSigner(ctx); err != nil {
oc.logger.Error().Err(err).Msg("runObserverSignerSync: syncObserverSigner failed")
}
}
}
}

// syncs and provisions observers & signers.
// Note that zctx.AppContext Update is a responsibility of another component
// See zetacore.Client{}.UpdateZetacoreContextWorker
func (oc *Orchestrator) syncObserverSigner(ctx context.Context) error {
oc.mu.Lock()
defer oc.mu.Unlock()

client := oc.zetacoreClient

added, removed, err := syncObserverMap(ctx, client, oc.tss, oc.dbDirectory, oc.baseLogger, oc.ts, &oc.observerMap)
if err != nil {
return errors.Wrap(err, "syncObserverMap failed")
}

if added+removed > 0 {
oc.logger.Info().
Int("observer.added", added).
Int("observer.removed", removed).
Msg("synced observers")
}

added, removed, err = syncSignerMap(ctx, oc.tss, oc.baseLogger, oc.ts, &oc.signerMap)
if err != nil {
return errors.Wrap(err, "syncSignerMap failed")
}

if added+removed > 0 {
oc.logger.Info().
Int("signers.added", added).
Int("signers.removed", removed).
Msg("synced signers")
}

return nil
}
Loading

0 comments on commit 6778164

Please sign in to comment.