Skip to content

Commit

Permalink
Add ctx to evm observer
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Jul 4, 2024
1 parent 0d3699f commit d4e7db3
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 91 deletions.
119 changes: 78 additions & 41 deletions zetaclient/chains/evm/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,27 @@ import (
"github.com/zeta-chain/zetacore/zetaclient/chains/evm"
"github.com/zeta-chain/zetacore/zetaclient/compliance"
"github.com/zeta-chain/zetacore/zetaclient/config"
zctx "github.com/zeta-chain/zetacore/zetaclient/context"
"github.com/zeta-chain/zetacore/zetaclient/metrics"
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"
"github.com/zeta-chain/zetacore/zetaclient/zetacore"
)

// WatchInbound watches evm chain for incoming txs and post votes to zetacore
// TODO(revamp): move ticker function to a separate file
func (ob *Observer) WatchInbound() {
func (ob *Observer) WatchInbound(ctx context.Context) error {
app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

ticker, err := clienttypes.NewDynamicTicker(
fmt.Sprintf("EVM_WatchInbound_%d", ob.Chain().ChainId),
ob.GetChainParams().InboundTicker,
)
if err != nil {
ob.Logger().Inbound.Error().Err(err).Msg("error creating ticker")
return
return err
}
defer ticker.Stop()

Expand All @@ -51,60 +57,65 @@ func (ob *Observer) WatchInbound() {
for {
select {
case <-ticker.C():
if !ob.AppContext().IsInboundObservationEnabled(ob.GetChainParams()) {
if !app.IsInboundObservationEnabled(ob.GetChainParams()) {
sampledLogger.Info().
Msgf("WatchInbound: inbound observation is disabled for chain %d", ob.Chain().ChainId)
continue
}
err := ob.ObserveInbound(sampledLogger)
err := ob.ObserveInbound(ctx, sampledLogger)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeInbound error")
}
ticker.UpdateInterval(ob.GetChainParams().InboundTicker, ob.Logger().Inbound)
case <-ob.StopChannel():
ob.Logger().Inbound.Info().Msgf("WatchInbound stopped for chain %d", ob.Chain().ChainId)
return
return nil
}
}
}

// WatchInboundTracker gets a list of Inbound tracker suggestions from zeta-core at each tick and tries to check if the in-tx was confirmed.
// If it was, it tries to broadcast the confirmation vote. If this zeta client has previously broadcast the vote, the tx would be rejected
// TODO(revamp): move inbound tracker function to a separate file
func (ob *Observer) WatchInboundTracker() {
func (ob *Observer) WatchInboundTracker(ctx context.Context) error {
app, err := zctx.FromContext(ctx)
if err != nil {
return err
}

ticker, err := clienttypes.NewDynamicTicker(
fmt.Sprintf("EVM_WatchInboundTracker_%d", ob.Chain().ChainId),
ob.GetChainParams().InboundTicker,
)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("error creating ticker")
return
return err
}
defer ticker.Stop()

ob.Logger().Inbound.Info().Msgf("Inbound tracker watcher started for chain %d", ob.Chain().ChainId)
for {
select {
case <-ticker.C():
if !ob.AppContext().IsInboundObservationEnabled(ob.GetChainParams()) {
if !app.IsInboundObservationEnabled(ob.GetChainParams()) {
continue
}
err := ob.ProcessInboundTrackers()
err := ob.ProcessInboundTrackers(ctx)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("ProcessInboundTrackers error")
}
ticker.UpdateInterval(ob.GetChainParams().InboundTicker, ob.Logger().Inbound)
case <-ob.StopChannel():
ob.Logger().Inbound.Info().Msgf("WatchInboundTracker stopped for chain %d", ob.Chain().ChainId)
return
return nil
}
}
}

// ProcessInboundTrackers processes inbound trackers from zetacore
// TODO(revamp): move inbound tracker function to a separate file
func (ob *Observer) ProcessInboundTrackers() error {
trackers, err := ob.ZetacoreClient().GetInboundTrackersForChain(ob.Chain().ChainId)
func (ob *Observer) ProcessInboundTrackers(ctx context.Context) error {
trackers, err := ob.ZetacoreClient().GetInboundTrackersForChain(ctx, ob.Chain().ChainId)
if err != nil {
return err
}
Expand Down Expand Up @@ -134,11 +145,11 @@ func (ob *Observer) ProcessInboundTrackers() error {
// check and vote on inbound tx
switch tracker.CoinType {
case coin.CoinType_Zeta:
_, err = ob.CheckAndVoteInboundTokenZeta(tx, receipt, true)
_, err = ob.CheckAndVoteInboundTokenZeta(ctx, tx, receipt, true)
case coin.CoinType_ERC20:
_, err = ob.CheckAndVoteInboundTokenERC20(tx, receipt, true)
_, err = ob.CheckAndVoteInboundTokenERC20(ctx, tx, receipt, true)
case coin.CoinType_Gas:
_, err = ob.CheckAndVoteInboundTokenGas(tx, receipt, true)
_, err = ob.CheckAndVoteInboundTokenGas(ctx, tx, receipt, true)
default:
return fmt.Errorf(
"unknown coin type %s for inbound %s chain %d",
Expand All @@ -155,7 +166,7 @@ func (ob *Observer) ProcessInboundTrackers() error {
}

// ObserveInbound observes the evm chain for inbounds and posts votes to zetacore
func (ob *Observer) ObserveInbound(sampledLogger zerolog.Logger) error {
func (ob *Observer) ObserveInbound(ctx context.Context, sampledLogger zerolog.Logger) error {
// get and update latest block height
blockNumber, err := ob.evmClient.BlockNumber(context.Background())
if err != nil {
Expand Down Expand Up @@ -192,13 +203,13 @@ func (ob *Observer) ObserveInbound(sampledLogger zerolog.Logger) error {
startBlock, toBlock := ob.calcBlockRangeToScan(confirmedBlockNum, lastScanned, config.MaxBlocksPerPeriod)

// task 1: query evm chain for zeta sent logs (read at most 100 blocks in one go)
lastScannedZetaSent := ob.ObserveZetaSent(startBlock, toBlock)
lastScannedZetaSent, _ := ob.ObserveZetaSent(ctx, startBlock, toBlock)

// task 2: query evm chain for deposited logs (read at most 100 blocks in one go)
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)
lastScannedDeposited := ob.ObserveERC20Deposited(ctx, startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock)
lastScannedTssRecvd, _ := ob.ObserverTSSReceive(ctx, startBlock, toBlock)

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand All @@ -225,12 +236,18 @@ func (ob *Observer) ObserveInbound(sampledLogger zerolog.Logger) error {

// ObserveZetaSent queries the ZetaSent event from the connector contract and posts to zetacore
// returns the last block successfully scanned
func (ob *Observer) ObserveZetaSent(startBlock, toBlock uint64) uint64 {
func (ob *Observer) ObserveZetaSent(ctx context.Context, startBlock, toBlock uint64) (uint64, error) {
app, err := zctx.FromContext(ctx)
if err != nil {
return 0, err
}

// filter ZetaSent logs
addrConnector, connector, err := ob.GetConnectorContract()
if err != nil {
ob.Logger().Chain.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:")
return startBlock - 1 // lastScanned
// lastScanned
return startBlock - 1, err
}
iter, err := connector.FilterZetaSent(&bind.FilterOpts{
Start: startBlock,
Expand All @@ -240,7 +257,8 @@ func (ob *Observer) ObserveZetaSent(startBlock, toBlock uint64) uint64 {
if err != nil {
ob.Logger().Chain.Warn().Err(err).Msgf(
"ObserveZetaSent: FilterZetaSent error from block %d to %d for chain %d", startBlock, toBlock, ob.Chain().ChainId)
return startBlock - 1 // lastScanned
// lastScanned
return startBlock - 1, err
}

// collect and sort events by block number, then tx index, then log index (ascending)
Expand Down Expand Up @@ -286,25 +304,27 @@ func (ob *Observer) ObserveZetaSent(startBlock, toBlock uint64) uint64 {
}
guard[event.Raw.TxHash.Hex()] = true

msg := ob.BuildInboundVoteMsgForZetaSentEvent(event)
msg := ob.BuildInboundVoteMsgForZetaSentEvent(app, event)
if msg != nil {
_, err = ob.PostVoteInbound(
ctx,
msg,
coin.CoinType_Zeta,
zetacore.PostVoteInboundMessagePassingExecutionGasLimit,
)
if err != nil {
return beingScanned - 1 // we have to re-scan from this block next time
// we have to re-scan from this block next time
return beingScanned - 1, err
}
}
}
// successful processed all events in [startBlock, toBlock]
return toBlock
return toBlock, nil
}

// ObserveERC20Deposited queries the ERC20CustodyDeposited event from the ERC20Custody contract and posts to zetacore
// returns the last block successfully scanned
func (ob *Observer) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 {
func (ob *Observer) ObserveERC20Deposited(ctx context.Context, startBlock, toBlock uint64) uint64 {
// filter ERC20CustodyDeposited logs
addrCustody, erc20custodyContract, err := ob.GetERC20CustodyContract()
if err != nil {
Expand Down Expand Up @@ -376,7 +396,7 @@ func (ob *Observer) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 {

msg := ob.BuildInboundVoteMsgForDepositedEvent(event, sender)
if msg != nil {
_, err = ob.PostVoteInbound(msg, coin.CoinType_ERC20, zetacore.PostVoteInboundExecutionGasLimit)
_, err = ob.PostVoteInbound(ctx, msg, coin.CoinType_ERC20, zetacore.PostVoteInboundExecutionGasLimit)
if err != nil {
return beingScanned - 1 // we have to re-scan from this block next time
}
Expand All @@ -388,42 +408,54 @@ func (ob *Observer) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 {

// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetacore
// returns the last block successfully scanned
func (ob *Observer) ObserverTSSReceive(startBlock, toBlock uint64) uint64 {
func (ob *Observer) ObserverTSSReceive(ctx context.Context, startBlock, toBlock uint64) (uint64, error) {
app, err := zctx.FromContext(ctx)
if err != nil {
return 0, err
}

// query incoming gas asset
for bn := startBlock; bn <= toBlock; bn++ {
// post new block header (if any) to zetacore and ignore error
// TODO: consider having a independent ticker(from TSS scaning) for posting block headers
// https://github.com/zeta-chain/node/issues/1847
blockHeaderVerification, found := ob.AppContext().GetBlockHeaderEnabledChains(ob.Chain().ChainId)
blockHeaderVerification, found := app.GetBlockHeaderEnabledChains(ob.Chain().ChainId)
if found && blockHeaderVerification.Enabled {
// post block header for supported chains
// TODO: move this logic in its own routine
// https://github.com/zeta-chain/node/issues/2204
err := ob.postBlockHeader(toBlock)
err := ob.postBlockHeader(ctx, toBlock)
if err != nil {
ob.Logger().Inbound.Error().Err(err).Msg("error posting block header")
}
}

// observe TSS received gas token in block 'bn'
err := ob.ObserveTSSReceiveInBlock(bn)
err := ob.ObserveTSSReceiveInBlock(ctx, bn)
if err != nil {
ob.Logger().Inbound.Error().
Err(err).
Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.Chain().ChainId)
return bn - 1 // we have to re-scan from this block next time
return bn - 1, nil // we have to re-scan from this block next time
}
}

// successful processed all gas asset deposits in [startBlock, toBlock]
return toBlock
return toBlock, nil
}

// CheckAndVoteInboundTokenZeta checks and votes on the given inbound Zeta token
func (ob *Observer) CheckAndVoteInboundTokenZeta(
ctx context.Context,
tx *ethrpc.Transaction,
receipt *ethtypes.Receipt,
vote bool,
) (string, error) {
app, err := zctx.FromContext(ctx)
if err != nil {
return "", err
}

// check confirmations
if confirmed := ob.HasEnoughConfirmations(receipt, ob.LastBlock()); !confirmed {
return "", fmt.Errorf(
Expand All @@ -447,7 +479,7 @@ func (ob *Observer) CheckAndVoteInboundTokenZeta(
// sanity check tx event
err = evm.ValidateEvmTxLog(&event.Raw, addrConnector, tx.Hash, evm.TopicsZetaSent)
if err == nil {
msg = ob.BuildInboundVoteMsgForZetaSentEvent(event)
msg = ob.BuildInboundVoteMsgForZetaSentEvent(app, event)
} else {
ob.Logger().Inbound.Error().Err(err).Msgf("CheckEvmTxLog error on inbound %s chain %d", tx.Hash, ob.Chain().ChainId)
return "", err
Expand All @@ -461,14 +493,15 @@ func (ob *Observer) CheckAndVoteInboundTokenZeta(
return "", nil
}
if vote {
return ob.PostVoteInbound(msg, coin.CoinType_Zeta, zetacore.PostVoteInboundMessagePassingExecutionGasLimit)
return ob.PostVoteInbound(ctx, msg, coin.CoinType_Zeta, zetacore.PostVoteInboundMessagePassingExecutionGasLimit)
}

return msg.Digest(), nil
}

// CheckAndVoteInboundTokenERC20 checks and votes on the given inbound ERC20 token
func (ob *Observer) CheckAndVoteInboundTokenERC20(
ctx context.Context,
tx *ethrpc.Transaction,
receipt *ethtypes.Receipt,
vote bool,
Expand Down Expand Up @@ -511,14 +544,15 @@ func (ob *Observer) CheckAndVoteInboundTokenERC20(
return "", nil
}
if vote {
return ob.PostVoteInbound(msg, coin.CoinType_ERC20, zetacore.PostVoteInboundExecutionGasLimit)
return ob.PostVoteInbound(ctx, msg, coin.CoinType_ERC20, zetacore.PostVoteInboundExecutionGasLimit)
}

return msg.Digest(), nil
}

// CheckAndVoteInboundTokenGas checks and votes on the given inbound gas token
func (ob *Observer) CheckAndVoteInboundTokenGas(
ctx context.Context,
tx *ethrpc.Transaction,
receipt *ethtypes.Receipt,
vote bool,
Expand Down Expand Up @@ -549,21 +583,23 @@ func (ob *Observer) CheckAndVoteInboundTokenGas(
return "", nil
}
if vote {
return ob.PostVoteInbound(msg, coin.CoinType_Gas, zetacore.PostVoteInboundExecutionGasLimit)
return ob.PostVoteInbound(ctx, msg, coin.CoinType_Gas, zetacore.PostVoteInboundExecutionGasLimit)
}

return msg.Digest(), nil
}

// PostVoteInbound posts a vote for the given vote message
func (ob *Observer) PostVoteInbound(
ctx context.Context,
msg *types.MsgVoteInbound,
coinType coin.CoinType,
retryGasLimit uint64,
) (string, error) {
txHash := msg.InboundHash
chainID := ob.Chain().ChainId
zetaHash, ballot, err := ob.ZetacoreClient().PostVoteInbound(zetacore.PostVoteInboundGasLimit, retryGasLimit, msg)
zetaHash, ballot, err := ob.ZetacoreClient().
PostVoteInbound(ctx, zetacore.PostVoteInboundGasLimit, retryGasLimit, msg)
if err != nil {
ob.Logger().Inbound.Err(err).
Msgf("inbound detected: error posting vote for chain %d token %s inbound %s", chainID, coinType, txHash)
Expand Down Expand Up @@ -640,6 +676,7 @@ func (ob *Observer) BuildInboundVoteMsgForDepositedEvent(

// BuildInboundVoteMsgForZetaSentEvent builds a inbound vote message for a ZetaSent event
func (ob *Observer) BuildInboundVoteMsgForZetaSentEvent(
appContext *zctx.AppContext,
event *zetaconnector.ZetaConnectorNonEthZetaSent,
) *types.MsgVoteInbound {
destChain := chains.GetChainFromChainID(event.DestinationChainId.Int64())
Expand All @@ -658,7 +695,7 @@ func (ob *Observer) BuildInboundVoteMsgForZetaSentEvent(
}

if !destChain.IsZetaChain() {
paramsDest, found := ob.AppContext().GetEVMChainParams(destChain.ChainId)
paramsDest, found := appContext.GetEVMChainParams(destChain.ChainId)
if !found {
ob.Logger().Inbound.Warn().
Msgf("chain id not present in EVMChainParams %d", event.DestinationChainId.Int64())
Expand Down Expand Up @@ -744,7 +781,7 @@ func (ob *Observer) BuildInboundVoteMsgForTokenSentToTSS(
}

// ObserveTSSReceiveInBlock queries the incoming gas asset to TSS address in a single block and posts votes
func (ob *Observer) ObserveTSSReceiveInBlock(blockNumber uint64) error {
func (ob *Observer) ObserveTSSReceiveInBlock(ctx context.Context, blockNumber uint64) error {
block, err := ob.GetBlockByNumberCached(blockNumber)
if err != nil {
return errors.Wrapf(err, "error getting block %d for chain %d", blockNumber, ob.Chain().ChainId)
Expand All @@ -758,7 +795,7 @@ func (ob *Observer) ObserveTSSReceiveInBlock(blockNumber uint64) error {
return errors.Wrapf(err, "error getting receipt for inbound %s chain %d", tx.Hash, ob.Chain().ChainId)
}

_, err = ob.CheckAndVoteInboundTokenGas(&tx, receipt, true)
_, err = ob.CheckAndVoteInboundTokenGas(ctx, &tx, receipt, true)
if err != nil {
return errors.Wrapf(
err,
Expand Down
Loading

0 comments on commit d4e7db3

Please sign in to comment.