Skip to content

Commit

Permalink
created sub function to observe incoming deposits to TSS address in o…
Browse files Browse the repository at this point in the history
…ne block
  • Loading branch information
ws4charlie committed Mar 9, 2024
1 parent f8e8565 commit 2e12c4c
Show file tree
Hide file tree
Showing 11 changed files with 2,445 additions and 128 deletions.
2 changes: 1 addition & 1 deletion zetaclient/bitcoin/bitcoin_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
func MockBTCClientMainnet() *BTCChainClient {
return &BTCChainClient{
chain: common.BtcMainnetChain(),
zetaClient: stub.NewZetaCoreBridge(),
zetaClient: stub.NewMockZetaCoreBridge(),
Tss: stub.NewTSSMainnet(),
}
}
Expand Down
65 changes: 24 additions & 41 deletions zetaclient/evm/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ type Log struct {
type ChainClient struct {
chain common.Chain
evmClient interfaces.EVMRPCClient
evmJSONRPC interfaces.EVMJSONRPCClient
zetaClient interfaces.ZetaCoreBridger
Tss interfaces.TSSSigner
evmJSONRPC *ethrpc.EthRPC
lastBlockScanned uint64
lastBlock uint64
BlockTimeExternalChain uint64 // block time in seconds
Expand All @@ -81,7 +81,6 @@ type ChainClient struct {
outTxPendingTransactions map[string]*ethtypes.Transaction
outTXConfirmedReceipts map[string]*ethtypes.Receipt
outTXConfirmedTransactions map[string]*ethtypes.Transaction
OutTxChan chan OutTx // send to this channel if you want something back!
stop chan struct{}
logger Log
coreContext *corecontext.ZetaCoreContext
Expand Down Expand Up @@ -128,7 +127,6 @@ func NewEVMChainClient(
ob.outTxPendingTransactions = make(map[string]*ethtypes.Transaction)
ob.outTXConfirmedReceipts = make(map[string]*ethtypes.Receipt)
ob.outTXConfirmedTransactions = make(map[string]*ethtypes.Transaction)
ob.OutTxChan = make(chan OutTx, 100)

ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.ChainName.String(), evmCfg.Endpoint)
client, err := ethclient.Dial(evmCfg.Endpoint)
Expand Down Expand Up @@ -176,13 +174,13 @@ func (ob *ChainClient) WithLogger(logger zerolog.Logger) {
}
}

func (ob *ChainClient) WithEvmClient(client *ethclient.Client) {
func (ob *ChainClient) WithEvmClient(client interfaces.EVMRPCClient) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.evmClient = client
}

func (ob *ChainClient) WithEvmJSONRPC(client *ethrpc.EthRPC) {
func (ob *ChainClient) WithEvmJSONRPC(client interfaces.EVMJSONRPCClient) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.evmJSONRPC = client
Expand Down Expand Up @@ -946,13 +944,13 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
startBlock, toBlock := ob.calcBlockRangeToScan(confirmedBlockNum, lastScanned, config.MaxBlocksPerPeriod)

// task 1: query evm chain for zeta sent logs (read at most 100 blocks in one go)
lastScannedZetaSent := ob.observeZetaSent(startBlock, toBlock)
lastScannedZetaSent := ob.ObserveZetaSent(startBlock, toBlock)

// task 2: query evm chain for deposited logs (read at most 100 blocks in one go)
lastScannedDeposited := ob.observeERC20Deposited(startBlock, toBlock)
lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock)

// task 3: query the incoming tx to TSS address (read at most 100 blocks in one go)
lastScannedTssRecvd := ob.observerTSSReceive(startBlock, toBlock, flags)
lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags)

// note: using lowest height for all 3 events is not perfect, but it's simple and good enough
lastScannedLowest := lastScannedZetaSent
Expand All @@ -975,13 +973,13 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error {
return nil
}

// observeZetaSent queries the ZetaSent event from the connector contract and posts to zetabridge
// ObserveZetaSent queries the ZetaSent event from the connector contract and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) observeZetaSent(startBlock, toBlock uint64) uint64 {
func (ob *ChainClient) ObserveZetaSent(startBlock, toBlock uint64) uint64 {
// filter ZetaSent logs
addrConnector, connector, err := ob.GetConnectorContract()
if err != nil {
ob.logger.ChainLogger.Warn().Err(err).Msgf("observeZetaSent: GetConnectorContract error:")
ob.logger.ChainLogger.Warn().Err(err).Msgf("ObserveZetaSent: GetConnectorContract error:")
return startBlock - 1 // lastScanned
}
iter, err := connector.FilterZetaSent(&bind.FilterOpts{
Expand All @@ -991,7 +989,7 @@ func (ob *ChainClient) observeZetaSent(startBlock, toBlock uint64) uint64 {
}, []ethcommon.Address{}, []*big.Int{})
if err != nil {
ob.logger.ChainLogger.Warn().Err(err).Msgf(
"observeZetaSent: FilterZetaSent error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId)
"ObserveZetaSent: FilterZetaSent error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId)
return startBlock - 1 // lastScanned
}

Expand All @@ -1004,7 +1002,7 @@ func (ob *ChainClient) observeZetaSent(startBlock, toBlock uint64) uint64 {
events = append(events, iter.Event)
continue
}
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("observeZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d",
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveZetaSent: invalid ZetaSent event in tx %s on chain %d at height %d",
iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber)
}
sort.SliceStable(events, func(i, j int) bool {
Expand All @@ -1030,7 +1028,7 @@ func (ob *ChainClient) observeZetaSent(startBlock, toBlock uint64) uint64 {
}
// guard against multiple events in the same tx
if guard[event.Raw.TxHash.Hex()] {
ob.logger.ExternalChainWatcher.Warn().Msgf("observeZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash)
ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveZetaSent: multiple remote call events detected in tx %s", event.Raw.TxHash)
continue
}
guard[event.Raw.TxHash.Hex()] = true
Expand All @@ -1047,13 +1045,13 @@ func (ob *ChainClient) observeZetaSent(startBlock, toBlock uint64) uint64 {
return toBlock
}

// observeERC20Deposited queries the ERC20CustodyDeposited event from the ERC20Custody contract and posts to zetabridge
// ObserveERC20Deposited queries the ERC20CustodyDeposited event from the ERC20Custody contract and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint64 {
func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 {
// filter ERC20CustodyDeposited logs
addrCustody, erc20custodyContract, err := ob.GetERC20CustodyContract()
if err != nil {
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("observeERC20Deposited: GetERC20CustodyContract error:")
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: GetERC20CustodyContract error:")
return startBlock - 1 // lastScanned
}
iter, err := erc20custodyContract.FilterDeposited(&bind.FilterOpts{
Expand All @@ -1063,7 +1061,7 @@ func (ob *ChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint64
}, []ethcommon.Address{})
if err != nil {
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf(
"observeERC20Deposited: FilterDeposited error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId)
"ObserveERC20Deposited: FilterDeposited error from block %d to %d for chain %d", startBlock, toBlock, ob.chain.ChainId)
return startBlock - 1 // lastScanned
}

Expand All @@ -1076,7 +1074,7 @@ func (ob *ChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint64
events = append(events, iter.Event)
continue
}
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("observeERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d",
ob.logger.ExternalChainWatcher.Warn().Err(err).Msgf("ObserveERC20Deposited: invalid Deposited event in tx %s on chain %d at height %d",
iter.Event.Raw.TxHash.Hex(), ob.chain.ChainId, iter.Event.Raw.BlockNumber)
}
sort.SliceStable(events, func(i, j int) bool {
Expand All @@ -1103,14 +1101,14 @@ func (ob *ChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint64
tx, _, err := ob.TransactionByHash(event.Raw.TxHash.Hex())
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf(
"observeERC20Deposited: error getting transaction for intx %s chain %d", event.Raw.TxHash, ob.chain.ChainId)
"ObserveERC20Deposited: error getting transaction for intx %s chain %d", event.Raw.TxHash, ob.chain.ChainId)
return beingScanned - 1 // we have to re-scan from this block next time
}
sender := ethcommon.HexToAddress(tx.From)

// guard against multiple events in the same tx
if guard[event.Raw.TxHash.Hex()] {
ob.logger.ExternalChainWatcher.Warn().Msgf("observeERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash)
ob.logger.ExternalChainWatcher.Warn().Msgf("ObserveERC20Deposited: multiple remote call events detected in tx %s", event.Raw.TxHash)
continue
}
guard[event.Raw.TxHash.Hex()] = true
Expand All @@ -1127,9 +1125,9 @@ func (ob *ChainClient) observeERC20Deposited(startBlock, toBlock uint64) uint64
return toBlock
}

// observerTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge
// returns the last block successfully scanned
func (ob *ChainClient) observerTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 {
if !ob.GetChainParams().IsSupported {
return startBlock - 1 // lastScanned
}
Expand All @@ -1147,27 +1145,12 @@ func (ob *ChainClient) observerTSSReceive(startBlock, toBlock uint64, flags obse
}
}

// TODO: we can track the total number of 'getBlockByNumber' RPC calls made
block, err := ob.GetBlockByNumberCached(bn)
// observe TSS received gas token in block 'bn'
err := ob.ObserveTSSReceiveInBlock(bn)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("observerTSSReceive: error getting block %d for chain %d", bn, ob.chain.ChainId)
ob.logger.ExternalChainWatcher.Error().Err(err).Msgf("ObserverTSSReceive: error observing TSS received token in block %d for chain %d", bn, ob.chain.ChainId)
return bn - 1 // we have to re-scan from this block next time
}
for i := range block.Transactions {
tx := block.Transactions[i]
if ethcommon.HexToAddress(tx.To) == ob.Tss.EVMAddress() {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), ethcommon.HexToHash(tx.Hash))
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf("observerTSSReceive: error getting receipt for intx %s chain %d", tx.Hash, ob.chain.ChainId)
return bn - 1 // we have to re-scan from this block next time
}
_, err = ob.CheckAndVoteInboundTokenGas(&tx, receipt, true)
if err != nil {
ob.logger.ExternalChainWatcher.Err(err).Msgf("observerTSSReceive: error checking and voting inbound gas asset for intx %s chain %d", tx.Hash, ob.chain.ChainId)
return bn - 1 // we have to re-scan this block next time
}
}
}
}
// successful processed all gas asset deposits in [startBlock, toBlock]
return toBlock
Expand Down
2 changes: 1 addition & 1 deletion zetaclient/evm/evm_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func getEVMRPC(endpoint string) (interfaces.EVMRPCClient, *big.Int, ethtypes.Sig
if endpoint == stub.EVMRPCEnabled {
chainID := big.NewInt(common.BscMainnetChain().ChainId)
ethSigner := ethtypes.NewEIP155Signer(chainID)
client := stub.EvmClient{}
client := &stub.MockEvmClient{}
return client, chainID, ethSigner, nil
}

Expand Down
6 changes: 3 additions & 3 deletions zetaclient/evm/evm_signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func getNewEvmChainClient() (*ChainClient, error) {
coreCTX := corecontext.NewZetaCoreContext(cfg)
appCTX := appcontext.NewAppContext(coreCTX, cfg)

return NewEVMChainClient(appCTX, stub.NewZetaCoreBridge(), tss, "", logger, evmcfg, ts)
return NewEVMChainClient(appCTX, stub.NewMockZetaCoreBridge(), tss, "", logger, evmcfg, ts)
}

func getNewOutTxProcessor() *outtxprocessor.Processor {
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestSigner_TryProcessOutTx(t *testing.T) {
mockChainClient, err := getNewEvmChainClient()
require.NoError(t, err)

evmSigner.TryProcessOutTx(cctx, processorManager, "123", mockChainClient, stub.NewZetaCoreBridge(), 123)
evmSigner.TryProcessOutTx(cctx, processorManager, "123", mockChainClient, stub.NewMockZetaCoreBridge(), 123)

//Check if cctx was signed and broadcasted
list := evmSigner.GetReportedTxList()
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestSigner_BroadcastOutTx(t *testing.T) {
tx, err := evmSigner.SignERC20WithdrawTx(txData)
require.NoError(t, err)

evmSigner.BroadcastOutTx(tx, cctx, zerolog.Logger{}, sdktypes.AccAddress{}, stub.NewZetaCoreBridge(), txData)
evmSigner.BroadcastOutTx(tx, cctx, zerolog.Logger{}, sdktypes.AccAddress{}, stub.NewMockZetaCoreBridge(), txData)

//Check if cctx was signed and broadcasted
list := evmSigner.GetReportedTxList()
Expand Down
22 changes: 22 additions & 0 deletions zetaclient/evm/inbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,25 @@ func (ob *ChainClient) BuildInboundVoteMsgForTokenSentToTSS(tx *ethrpc.Transacti
0, // not a smart contract call
)
}

// ObserveTSSReceiveInBlock queries the incoming gas asset to TSS address in a single block and posts votes
func (ob *ChainClient) ObserveTSSReceiveInBlock(blockNumber uint64) error {
block, err := ob.GetBlockByNumberCached(blockNumber)
if err != nil {
return errors.Wrapf(err, "error getting block %d for chain %d", blockNumber, ob.chain.ChainId)
}
for i := range block.Transactions {
tx := block.Transactions[i]
if ethcommon.HexToAddress(tx.To) == ob.Tss.EVMAddress() {
receipt, err := ob.evmClient.TransactionReceipt(context.Background(), ethcommon.HexToHash(tx.Hash))
if err != nil {
return errors.Wrapf(err, "error getting receipt for intx %s chain %d", tx.Hash, ob.chain.ChainId)
}
_, err = ob.CheckAndVoteInboundTokenGas(&tx, receipt, true)
if err != nil {
return errors.Wrapf(err, "error checking and voting inbound gas asset for intx %s chain %d", tx.Hash, ob.chain.ChainId)
}
}
}
return nil
}
Loading

0 comments on commit 2e12c4c

Please sign in to comment.