Skip to content

Commit

Permalink
feat: zetaclient read logs functions (#1118)
Browse files Browse the repository at this point in the history
  • Loading branch information
kingpinXD authored Sep 27, 2023
1 parent 065bdc1 commit b764bf7
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 100 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ require (
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
github.com/emicklei/proto v1.11.1
github.com/evmos/ethermint v0.22.0
github.com/nanmu42/etherscan-api v1.10.0
github.com/pkg/errors v0.9.1
github.com/rakyll/statik v0.1.7
github.com/zeta-chain/protocol-contracts v1.0.2-athens3.0.20230816152528-db7d2bf9144b
github.com/tendermint/crypto v0.0.0-20191022145703-50d29ede1e15
github.com/zeta-chain/protocol-contracts v1.0.2-athens3.0.20230816152528-db7d2bf9144b
gitlab.com/thorchain/tss/go-tss v0.0.0-00010101000000-000000000000
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2321,6 +2321,8 @@ github.com/mwitkow/grpc-proxy v0.0.0-20181017164139-0f1106ef9c76/go.mod h1:x5OoJ
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/nakabonne/nestif v0.3.0/go.mod h1:dI314BppzXjJ4HsCnbo7XzrJHPszZsjnk5wEBSYHI2c=
github.com/nakabonne/nestif v0.3.1/go.mod h1:9EtoZochLn5iUprVDmDjqGKPofoUEBL8U4Ngq6aY7OE=
github.com/nanmu42/etherscan-api v1.10.0 h1:8lAwKbaHEVzXK+cbLaApxbmp4Kai12WKEcY9CxqxKbY=
github.com/nanmu42/etherscan-api v1.10.0/go.mod h1:P8oAUxbYfsdfGXQnHCgjTDs4YbmasUVCtYAYc4rrZ5w=
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (k msgServer) VoteOnObservedInboundTx(goCtx context.Context, msg *types.Msg
}
commit()
cctx.CctxStatus.ChangeStatus(types.CctxStatus_PendingOutbound, "")
k.RemoveInTxTrackerIfExists(ctx, cctx.InboundTxParams.SenderChainId, cctx.InboundTxParams.InboundTxObservedHash)
return &types.MsgVoteOnObservedInboundTxResponse{}, nil
}
}
16 changes: 12 additions & 4 deletions x/crosschain/keeper/keeper_in_tx_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ func (k Keeper) GetInTxTracker(ctx sdk.Context, chainID int64, txHash string) (v
k.cdc.MustUnmarshal(b, &val)
return val, true
}

func (k Keeper) RemoveInTxTrackerIfExists(ctx sdk.Context, chainID int64, txHash string) {
key := getInTrackerKey(chainID, txHash)
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.InTxTrackerKeyPrefix))
if store.Has(types.KeyPrefix(key)) {
store.Delete(types.KeyPrefix(key))
}
}
func (k Keeper) GetAllInTxTracker(ctx sdk.Context) (list []types.InTxTracker) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.InTxTrackerKeyPrefix))
iterator := sdk.KVStorePrefixIterator(store, []byte{})
Expand All @@ -47,9 +55,9 @@ func (k Keeper) GetAllInTxTracker(ctx sdk.Context) (list []types.InTxTracker) {
return list
}

func (k Keeper) GetAllInTxTrackerForChain(ctx sdk.Context, chainId int64) (list []types.InTxTracker) {
func (k Keeper) GetAllInTxTrackerForChain(ctx sdk.Context, chainID int64) (list []types.InTxTracker) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.InTxTrackerKeyPrefix))
iterator := sdk.KVStorePrefixIterator(store, []byte(fmt.Sprintf("%d-", chainId)))
iterator := sdk.KVStorePrefixIterator(store, []byte(fmt.Sprintf("%d-", chainID)))
defer iterator.Close()
for ; iterator.Valid(); iterator.Next() {
var val types.InTxTracker
Expand All @@ -59,9 +67,9 @@ func (k Keeper) GetAllInTxTrackerForChain(ctx sdk.Context, chainId int64) (list
return list
}

func (k Keeper) GetAllInTxTrackerForChainPaginated(ctx sdk.Context, chainId int64, pagination *query.PageRequest) (inTxTrackers []types.InTxTracker, pageRes *query.PageResponse, err error) {
func (k Keeper) GetAllInTxTrackerForChainPaginated(ctx sdk.Context, chainID int64, pagination *query.PageRequest) (inTxTrackers []types.InTxTracker, pageRes *query.PageResponse, err error) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(fmt.Sprintf("%s", types.InTxTrackerKeyPrefix)))
chainStore := prefix.NewStore(store, types.KeyPrefix(fmt.Sprintf("%d-", chainId)))
chainStore := prefix.NewStore(store, types.KeyPrefix(fmt.Sprintf("%d-", chainID)))
pageRes, err = query.Paginate(chainStore, pagination, func(key []byte, value []byte) error {
var inTxTracker types.InTxTracker
if err := k.cdc.Unmarshal(value, &inTxTracker); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions zetaclient/bitcoin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (ob *BitcoinChainClient) observeInTx() error {
amount = amount.Mul(amount, big.NewFloat(1e8))
amountInt, _ := amount.Int(nil)
message := hex.EncodeToString(inTx.MemoBytes)
zetaHash, err := ob.zetaClient.PostSend(
msg := GetInBoundVoteMessage(
inTx.FromAddress,
ob.chain.ChainId,
inTx.FromAddress,
Expand All @@ -302,9 +302,10 @@ func (ob *BitcoinChainClient) observeInTx() error {
inTx.BlockNumber,
0,
common.CoinType_Gas,
PostSendEVMGasLimit,
"",
ob.zetaClient.keys.GetOperatorAddress().String(),
)
zetaHash, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg)
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("error posting to zeta core")
continue
Expand Down
110 changes: 22 additions & 88 deletions zetaclient/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,19 @@ package zetaclient
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
math2 "math"
"math/big"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"gorm.io/driver/sqlite"
"gorm.io/gorm"

"cosmossdk.io/math"
"github.com/pkg/errors"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -190,6 +186,7 @@ func (ob *EVMChainClient) GetERC20CustodyContract() (*erc20custody.ERC20Custody,
}

func (ob *EVMChainClient) Start() {
go ob.ExternalChainWatcherForNewInboundTrackerSuggestions()
go ob.ExternalChainWatcher() // Observes external Chains for incoming trasnactions
go ob.WatchGasPrice() // Observes external Chains for Gas prices and posts to core
go ob.observeOutTx() // Populates receipts and confirmed outbound transactions
Expand Down Expand Up @@ -683,41 +680,14 @@ func (ob *EVMChainClient) observeInTX() error {
}
// Pull out arguments from logs
for logs.Next() {
event := logs.Event
ob.logger.ExternalChainWatcher.Info().Msgf("TxBlockNumber %d Transaction Hash: %s Message : %s", event.Raw.BlockNumber, event.Raw.TxHash, event.Message)
destChain := common.GetChainFromChainID(event.DestinationChainId.Int64())
if destChain == nil {
ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not supported %d", event.DestinationChainId.Int64())
msg := ob.GetInboundVoteMsgForZetaSentEvent(logs.Event)
if msg == nil {
continue
}
destAddr := clienttypes.BytesToEthHex(event.DestinationAddress)
cfgDest, found := ob.cfg.GetEVMConfig(destChain.ChainId)
if !found {
ob.logger.ExternalChainWatcher.Warn().Msgf("chain id not present in EVMChainConfigs %d", event.DestinationChainId.Int64())
continue
}
if strings.EqualFold(destAddr, cfgDest.ZetaTokenContractAddress) {
ob.logger.ExternalChainWatcher.Warn().Msgf("potential attack attempt: %s destination address is ZETA token contract address %s", destChain, destAddr)
continue
}
zetaHash, err := ob.zetaClient.PostSend(
event.ZetaTxSenderAddress.Hex(),
ob.chain.ChainId,
event.SourceTxOriginAddress.Hex(),
clienttypes.BytesToEthHex(event.DestinationAddress),
destChain.ChainId,
math.NewUintFromBigInt(event.ZetaValueAndGas),
base64.StdEncoding.EncodeToString(event.Message),
event.Raw.TxHash.Hex(),
event.Raw.BlockNumber,
event.DestinationGasLimit.Uint64(),
common.CoinType_Zeta,
PostSendNonEVMGasLimit,
"",
)
zetaHash, err := ob.zetaClient.PostSend(PostSendNonEVMGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting to zeta core")
continue
return
}
ob.logger.ExternalChainWatcher.Info().Msgf("ZetaSent event detected and reported: PostSend zeta tx: %s", zetaHash)
}
Expand Down Expand Up @@ -750,33 +720,16 @@ func (ob *EVMChainClient) observeInTX() error {

// Pull out arguments from logs
for depositedLogs.Next() {
event := depositedLogs.Event
ob.logger.ExternalChainWatcher.Info().Msgf("TxBlockNumber %d Transaction Hash: %s Message : %s", event.Raw.BlockNumber, event.Raw.TxHash, event.Message)
// TODO :add logger to POSTSEND
if bytes.Compare(event.Message, []byte(DonationMessage)) == 0 {
ob.logger.ExternalChainWatcher.Info().Msgf("thank you rich folk for your donation!: %s", event.Raw.TxHash.Hex())
msg := ob.GetInboundVoteMsgForDepositedEvent(depositedLogs.Event)
if msg == nil {
continue
}
zetaHash, err := ob.zetaClient.PostSend(
"",
ob.chain.ChainId,
"",
clienttypes.BytesToEthHex(event.Recipient),
common.ZetaChain().ChainId,
math.NewUintFromBigInt(event.Amount),
hex.EncodeToString(event.Message),
event.Raw.TxHash.Hex(),
event.Raw.BlockNumber,
1_500_000,
common.CoinType_ERC20,
PostSendEVMGasLimit,
event.Asset.String(),
)
zetaHash, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting to zeta core")
continue
return
}
ob.logger.ExternalChainWatcher.Info().Msgf("ZRC20Cusotdy Deposited event detected and reported: PostSend zeta tx: %s", zetaHash)
ob.logger.ExternalChainWatcher.Info().Msgf("ZetaSent event detected and reported: PostSend zeta tx: %s", zetaHash)
}
}()

Expand Down Expand Up @@ -813,7 +766,7 @@ func (ob *EVMChainClient) observeInTX() error {
continue
}
if receipt.Status != 1 { // 1: successful, 0: failed
ob.logger.ExternalChainWatcher.Info().Msgf("tx %s failed; don't act", tx.Hash().Hex())
ob.logger.ExternalChainWatcher.Info().Msgf("tx %s failed; don't act", tx.Hash())
continue
}

Expand All @@ -827,12 +780,16 @@ func (ob *EVMChainClient) observeInTX() error {
continue
}
}
zetaHash, err := ob.ReportTokenSentToTSS(tx.Hash(), tx.Value(), receipt, from, tx.Data())
msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx.Hash(), tx.Value(), receipt, from, tx.Data())
if msg == nil {
continue
}
zetaHash, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting to zeta core")
continue
}
ob.logger.ExternalChainWatcher.Info().Msgf("Gas Deposit detected and reported: PostSend zeta tx: %s", zetaHash)
ob.logger.ExternalChainWatcher.Info().Msgf("ZetaSent event detected and reported: PostSend zeta tx: %s", zetaHash)
}
}
}
Expand Down Expand Up @@ -861,8 +818,11 @@ func (ob *EVMChainClient) observeInTX() error {

from := *tx.From
value := tx.Value.ToInt()

zetaHash, err := ob.ReportTokenSentToTSS(tx.Hash, value, receipt, from, tx.Input)
msg := ob.GetInboundVoteMsgForTokenSentToTSS(tx.Hash, value, receipt, from, tx.Input)
if msg == nil {
continue
}
zetaHash, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg)
if err != nil {
ob.logger.ExternalChainWatcher.Error().Err(err).Msg("error posting to zeta core")
continue
Expand All @@ -881,32 +841,6 @@ func (ob *EVMChainClient) observeInTX() error {
return nil
}

func (ob *EVMChainClient) ReportTokenSentToTSS(txhash ethcommon.Hash, value *big.Int, receipt *ethtypes.Receipt, from ethcommon.Address, data []byte) (string, error) {
ob.logger.ExternalChainWatcher.Info().Msgf("TSS inTx detected: %s, blocknum %d", txhash.Hex(), receipt.BlockNumber)
ob.logger.ExternalChainWatcher.Info().Msgf("TSS inTx value: %s", value.String())
ob.logger.ExternalChainWatcher.Info().Msgf("TSS inTx from: %s", from.Hex())
message := ""
if len(data) != 0 {
message = hex.EncodeToString(data)
}
zetaHash, err := ob.zetaClient.PostSend(
from.Hex(),
ob.chain.ChainId,
from.Hex(),
from.Hex(),
common.ZetaChain().ChainId,
math.NewUintFromBigInt(value),
message,
txhash.Hex(),
receipt.BlockNumber.Uint64(),
90_000,
common.CoinType_Gas,
PostSendEVMGasLimit,
"",
)
return zetaHash, err
}

func (ob *EVMChainClient) WatchGasPrice() {

err := ob.PostGasPrice()
Expand Down
Loading

0 comments on commit b764bf7

Please sign in to comment.