Skip to content

Commit

Permalink
Move observer.Start() to orchestrator. Shutdown zetaclient if not an …
Browse files Browse the repository at this point in the history
…observer
  • Loading branch information
swift1337 committed Jul 18, 2024
1 parent 6932b13 commit e48c824
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
55 changes: 30 additions & 25 deletions cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/orchestrator"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

type Multiaddr = core.Multiaddr
Expand Down Expand Up @@ -253,17 +254,15 @@ func start(_ *cobra.Command, _ []string) error {
startLogger.Error().Msgf("No chains enabled in updated config %s ", cfg.String())
}

observerList, err := zetacoreClient.GetObserverList(ctx)
if err != nil {
startLogger.Error().Err(err).Msg("GetObserverList error")
isObserver, err := isObserverNode(ctx, zetacoreClient)
switch {
case err != nil:
startLogger.Error().Msgf("Unable to determine if node is an observer")
return err
}
isNodeActive := false
for _, observer := range observerList {
if observer == zetacoreClient.GetKeys().GetOperatorAddress().String() {
isNodeActive = true
break
}
case !isObserver:
addr := zetacoreClient.GetKeys().GetOperatorAddress().String()
startLogger.Info().Str("operator_address", addr).Msg("This node is not an observer. Exit 0")
return nil
}

// CreateSignerMap: This creates a map of all signers for each chain.
Expand All @@ -281,23 +280,14 @@ func start(_ *cobra.Command, _ []string) error {
}
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain. Each chain observer is responsible for observing events on the chain and processing them.
// Creates a map of all chain observers for each chain.
// Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
if err != nil {
startLogger.Err(err).Msg("CreateChainObserverMap")
return err
}

if !isNodeActive {
startLogger.Error().
Msgf("Node %s is not an active observer external chain observers will not be started", zetacoreClient.GetKeys().GetOperatorAddress().String())
} else {
startLogger.Debug().Msgf("Node %s is an active observer starting external chain observers", zetacoreClient.GetKeys().GetOperatorAddress().String())
for _, observer := range observerMap {
observer.Start(ctx)
}
}

// Orchestrator wraps the zetacore client and adds the observers and signer maps to it.
// This is the high level object used for CCTX interactions
maestro, err := orchestrator.New(
Expand All @@ -313,6 +303,7 @@ func start(_ *cobra.Command, _ []string) error {
return err
}

// Start orchestrator with all observers and signers
if err := maestro.Start(ctx); err != nil {
startLogger.Error().Err(err).Msg("Unable to start orchestrator")
return err
Expand All @@ -338,10 +329,6 @@ func start(_ *cobra.Command, _ []string) error {
sig := <-ch
startLogger.Info().Msgf("stop signal received: %s", sig)

// stop chain observers
for _, observer := range observerMap {
observer.Stop()
}
zetacoreClient.Stop()

return nil
Expand Down Expand Up @@ -405,3 +392,21 @@ func promptPasswords() (string, string, error) {

return hotKeyPass, TSSKeyPass, err
}

// isObserverNode checks whether THIS node is an observer node.
func isObserverNode(ctx context.Context, client *zetacore.Client) (bool, error) {
observers, err := client.GetObserverList(ctx)
if err != nil {
return false, errors.Wrap(err, "unable to get observers list")
}

operatorAddress := client.GetKeys().GetOperatorAddress().String()

for _, observer := range observers {
if observer == operatorAddress {
return true, nil
}
}

return false, nil
}
4 changes: 4 additions & 0 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (oc *Orchestrator) Start(ctx context.Context) error {

oc.logger.Info().Str("signer", signerAddress.String()).Msg("Starting orchestrator")

for _, observer := range oc.observerMap {
observer.Start(ctx)
}

// start cctx scheduler
bg.Work(ctx, oc.StartCctxScheduler, bg.WithName("StartCctxScheduler"), bg.WithLogger(oc.logger.Logger))

Expand Down

0 comments on commit e48c824

Please sign in to comment.