Skip to content

Commit

Permalink
Move observer map creation to orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Jul 18, 2024
1 parent 4ab3236 commit 6932b13
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 145 deletions.
2 changes: 1 addition & 1 deletion cmd/zetaclientd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func start(_ *cobra.Command, _ []string) error {
dbpath := filepath.Join(userDir, ".zetaclient/chainobserver")

// Creates a map of all chain observers for each chain. Each chain observer is responsible for observing events on the chain and processing them.
observerMap, err := CreateChainObserverMap(ctx, appContext, zetacoreClient, tss, dbpath, logger, telemetryServer)
observerMap, err := orchestrator.CreateChainObserverMap(ctx, zetacoreClient, tss, dbpath, logger, telemetryServer)
if err != nil {
startLogger.Err(err).Msg("CreateChainObserverMap")
return err
Expand Down
139 changes: 0 additions & 139 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,15 @@
package main

import (
gocontext "context"
"fmt"

"cosmossdk.io/errors"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/pkg/chains"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/authz"
"github.com/zeta-chain/zetacore/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
btcrpc "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/config"
"github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/db"
"github.com/zeta-chain/zetacore/zetaclient/keys"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

// backwards compatibility
const btcDatabaseFilename = "btc_chain_client"

func CreateAuthzSigner(granter string, grantee sdk.AccAddress) {
authz.SetupAuthZSignerList(granter, grantee)
}
Expand Down Expand Up @@ -59,124 +41,3 @@ func CreateZetacoreClient(cfg config.Config, hotkeyPassword string, logger zerol

return client, nil
}

// CreateChainObserverMap creates a map of ChainObservers for all chains in the config
func CreateChainObserverMap(
ctx gocontext.Context,
appContext *context.AppContext,
zetacoreClient *zetacore.Client,
tss interfaces.TSSSigner,
dbpath string,
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainObserver, error) {
observerMap := make(map[int64]interfaces.ChainObserver)
// EVM observers
for _, evmConfig := range appContext.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue
}
chainParams, found := appContext.GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue
}

// create EVM client
evmClient, err := ethclient.Dial(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Msgf("error dailing endpoint %q", evmConfig.Endpoint)
continue
}

chainName := evmConfig.Chain.ChainName.String()

database, err := db.NewFromSqlite(dbpath, chainName, true)
if err != nil {
logger.Std.Error().Err(err).Msgf("Unable to open a database for EVM chain %q", chainName)
}

// create EVM chain observer
observer, err := evmobserver.NewObserver(
ctx,
evmConfig,
evmClient,
*chainParams,
zetacoreClient,
tss,
database,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for EVM chain %s", evmConfig.Chain.String())
continue
}
observerMap[evmConfig.Chain.ChainId] = observer
}

// create BTC chain observer
btcChain, btcConfig, btcEnabled := appContext.GetBTCChainAndConfig()
if btcEnabled {
_, chainParams, found := appContext.GetBTCChainParams()
if !found {
return nil, fmt.Errorf("BTC is enabled, but chains params not found")
}

btcObserver, err := createBTCObserver(
dbpath,
btcConfig,
btcChain,
*chainParams,
zetacoreClient,
tss,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for BTC chain %s", btcChain.ChainName.String())
} else {
observerMap[btcChain.ChainId] = btcObserver
}
}

return observerMap, nil
}

func createBTCObserver(
dbPath string,
cfg config.BTCConfig,
chain chains.Chain,
chainParams observertypes.ChainParams,
client *zetacore.Client,
tss interfaces.TSSSigner,
logger base.Logger,
ts *metrics.TelemetryServer,
) (*btcobserver.Observer, error) {
btcClient, err := btcrpc.NewRPCClient(cfg)
if err != nil {
return nil, errors.Wrap(err, "unable to create rpc client for BTC chain")
}

database, err := db.NewFromSqlite(dbPath, btcDatabaseFilename, true)
if err != nil {
return nil, errors.Wrap(err, "unable to open a database for BTC chain")
}

// create BTC chain observer
observer, err := btcobserver.NewObserver(
chain,
btcClient,
chainParams,
client,
tss,
database,
logger,
ts,
)
if err != nil {
return nil, errors.Wrap(err, "unable to create observer for BTC chain")
}

return observer, nil
}
30 changes: 25 additions & 5 deletions zetaclient/chains/base/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ type Observer struct {

// mu protects fields from concurrent access
// Note: base observer simply provides the mutex. It's the sub-struct's responsibility to use it to be thread-safe
mu *sync.Mutex
mu *sync.Mutex
started bool

// stop is the channel to signal the observer to stop
stop chan struct{}
Expand Down Expand Up @@ -120,17 +121,36 @@ func NewObserver(
return &ob, nil
}

// Start starts the observer. Returns true if the observer was already started (noop).
func (ob *Observer) Start() bool {
ob.mu.Lock()
defer ob.Mu().Unlock()

Check warning on line 127 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L125-L127

Added lines #L125 - L127 were not covered by tests

// noop
if ob.started {
return true

Check warning on line 131 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L130-L131

Added lines #L130 - L131 were not covered by tests
}

ob.started = true

Check warning on line 134 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L134

Added line #L134 was not covered by tests

return false

Check warning on line 136 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L136

Added line #L136 was not covered by tests
}

// Stop notifies all goroutines to stop and closes the database.
func (ob *Observer) Stop() {
ob.logger.Chain.Info().Msgf("observer is stopping for chain %d", ob.Chain().ChainId)
close(ob.stop)

ob.mu.Lock()
defer ob.mu.Unlock()

ob.started = false

// close database
if ob.db != nil {
if err := ob.db.Close(); err != nil {
ob.Logger().Chain.Error().Err(err).Msgf("unable to close db for chain %d", ob.Chain().ChainId)
}
if err := ob.db.Close(); err != nil {
ob.Logger().Chain.Error().Err(err).Msgf("unable to close db for chain %d", ob.Chain().ChainId)

Check warning on line 151 in zetaclient/chains/base/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/base/observer.go#L151

Added line #L151 was not covered by tests
}

ob.Logger().Chain.Info().Msgf("observer stopped for chain %d", ob.Chain().ChainId)
}

Expand Down
8 changes: 8 additions & 0 deletions zetaclient/chains/bitcoin/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ func (ob *Observer) GetChainParams() observertypes.ChainParams {

// Start starts the Go routine processes to observe the Bitcoin chain
func (ob *Observer) Start(ctx context.Context) {
ob.Mu().Lock()
defer ob.Mu().Unlock()

Check warning on line 204 in zetaclient/chains/bitcoin/observer/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/observer.go#L203-L204

Added lines #L203 - L204 were not covered by tests

if noop := ob.Observer.Start(); noop {
ob.Logger().Chain.Info().Msgf("observer is already started for chain %d", ob.Chain().ChainId)
return

Check warning on line 208 in zetaclient/chains/bitcoin/observer/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/bitcoin/observer/observer.go#L206-L208

Added lines #L206 - L208 were not covered by tests
}

ob.Logger().Chain.Info().Msgf("observer is starting for chain %d", ob.Chain().ChainId)

// watch bitcoin chain for incoming txs and post votes to zetacore
Expand Down
5 changes: 5 additions & 0 deletions zetaclient/chains/evm/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func FetchZetaTokenContract(

// Start all observation routines for the evm chain
func (ob *Observer) Start(ctx context.Context) {
if noop := ob.Observer.Start(); noop {
ob.Logger().Chain.Info().Msgf("observer is already started for chain %d", ob.Chain().ChainId)
return

Check warning on line 172 in zetaclient/chains/evm/observer/observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/chains/evm/observer/observer.go#L170-L172

Added lines #L170 - L172 were not covered by tests
}

ob.Logger().Chain.Info().Msgf("observer is starting for chain %d", ob.Chain().ChainId)

bg.Work(ctx, ob.WatchInbound, bg.WithName("WatchInbound"), bg.WithLogger(ob.Logger().Inbound))
Expand Down
111 changes: 111 additions & 0 deletions zetaclient/orchestrator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@ package orchestrator

import (
"context"
"fmt"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/pkg/errors"
"github.com/rs/zerolog"

"github.com/zeta-chain/zetacore/zetaclient/chains/base"
btcobserver "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/observer"
"github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/rpc"
btcsigner "github.com/zeta-chain/zetacore/zetaclient/chains/bitcoin/signer"
evmobserver "github.com/zeta-chain/zetacore/zetaclient/chains/evm/observer"
evmsigner "github.com/zeta-chain/zetacore/zetaclient/chains/evm/signer"
"github.com/zeta-chain/zetacore/zetaclient/chains/interfaces"
"github.com/zeta-chain/zetacore/zetaclient/config"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/db"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

// backwards compatibility
const btcDatabaseFilename = "btc_chain_client"

// CreateSignerMap creates a map of interfaces.ChainSigner (by chainID) for all chains in the config.
// Note that signer construction failure for a chain does not prevent the creation of signers for other chains.
func CreateSignerMap(
Expand Down Expand Up @@ -168,3 +178,104 @@ func (m *signerMap) unsetMissing(enabledChains []int64, logger zerolog.Logger) i

return removed
}

// CreateChainObserverMap creates a map of ChainObservers for all chains in the config
func CreateChainObserverMap(
ctx context.Context,
client *zetacore.Client,
tss interfaces.TSSSigner,
dbpath string,
logger base.Logger,
ts *metrics.TelemetryServer,
) (map[int64]interfaces.ChainObserver, error) {
observerMap := make(map[int64]interfaces.ChainObserver)

Check warning on line 191 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L190-L191

Added lines #L190 - L191 were not covered by tests

app, err := zctx.FromContext(ctx)
if err != nil {
return nil, errors.Wrapf(err, "failed to get app context")

Check warning on line 195 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L193-L195

Added lines #L193 - L195 were not covered by tests
}

// EVM observers
for _, evmConfig := range app.Config().GetAllEVMConfigs() {
if evmConfig.Chain.IsZetaChain() {
continue

Check warning on line 201 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L199-L201

Added lines #L199 - L201 were not covered by tests
}

chainParams, found := app.GetEVMChainParams(evmConfig.Chain.ChainId)
if !found {
logger.Std.Error().Msgf("ChainParam not found for chain %s", evmConfig.Chain.String())
continue

Check warning on line 207 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L204-L207

Added lines #L204 - L207 were not covered by tests
}

// create EVM client
evmClient, err := ethclient.Dial(evmConfig.Endpoint)
if err != nil {
logger.Std.Error().Err(err).Msgf("error dailing endpoint %q", evmConfig.Endpoint)
continue

Check warning on line 214 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L211-L214

Added lines #L211 - L214 were not covered by tests
}

chainName := evmConfig.Chain.ChainName.String()

Check warning on line 217 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L217

Added line #L217 was not covered by tests

database, err := db.NewFromSqlite(dbpath, chainName, true)
if err != nil {
logger.Std.Error().Err(err).Msgf("Unable to open a database for EVM chain %q", chainName)

Check warning on line 221 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L219-L221

Added lines #L219 - L221 were not covered by tests
}

// create EVM chain observer
observer, err := evmobserver.NewObserver(
ctx,
evmConfig,
evmClient,
*chainParams,
client,
tss,
database,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for EVM chain %s", evmConfig.Chain.String())
continue

Check warning on line 238 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L225-L238

Added lines #L225 - L238 were not covered by tests
}
observerMap[evmConfig.Chain.ChainId] = observer

Check warning on line 240 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L240

Added line #L240 was not covered by tests
}

// create BTC chain observer
btcChain, btcConfig, btcEnabled := app.GetBTCChainAndConfig()
if !btcEnabled {
return observerMap, nil

Check warning on line 246 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L244-L246

Added lines #L244 - L246 were not covered by tests
}

_, btcChainParams, found := app.GetBTCChainParams()
if !found {
return nil, fmt.Errorf("BTC is enabled, but chains params not found")

Check warning on line 251 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L249-L251

Added lines #L249 - L251 were not covered by tests
}

btcClient, err := rpc.NewRPCClient(btcConfig)
if err != nil {
return nil, errors.Wrap(err, "unable to create rpc client for BTC chain")

Check warning on line 256 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L254-L256

Added lines #L254 - L256 were not covered by tests
}

btcDatabase, err := db.NewFromSqlite(dbpath, btcDatabaseFilename, true)
if err != nil {
return nil, errors.Wrap(err, "unable to open a database for BTC chain")

Check warning on line 261 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L259-L261

Added lines #L259 - L261 were not covered by tests
}

btcObserver, err := btcobserver.NewObserver(
btcChain,
btcClient,
*btcChainParams,
client,
tss,
btcDatabase,
logger,
ts,
)
if err != nil {
logger.Std.Error().Err(err).Msgf("NewObserver error for BTC chain %s", btcChain.ChainName.String())
} else {
observerMap[btcChain.ChainId] = btcObserver

Check warning on line 277 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L264-L277

Added lines #L264 - L277 were not covered by tests
}

return observerMap, nil

Check warning on line 280 in zetaclient/orchestrator/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/orchestrator/bootstrap.go#L280

Added line #L280 was not covered by tests
}

0 comments on commit 6932b13

Please sign in to comment.