Skip to content

Commit

Permalink
make Solana inbound e2e test passing
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Jul 16, 2024
1 parent be0388a commit b3f277e
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 66 deletions.
1 change: 1 addition & 0 deletions cmd/zetaclientd/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func CreateChainObserverMap(
solChainParams,
zetacoreClient,
tss,
dbpath,
logger,
ts,
)
Expand Down
7 changes: 4 additions & 3 deletions cmd/zetae2e/config/localnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ additional_accounts:
evm_address: "0x283d810090EdF4043E75247eAeBcE848806237fD"
private_key: "7bb523963ee2c78570fb6113d886a4184d42565e8847f1cb639f5f5e2ef5b37a"
user_solana:
bech32_address: ""
evm_address: ""
private_key: "4yqSQxDeTBvn86BuxcN5jmZb2gaobFXrBqu8kiE9rZxNkVMe3LfXmFigRsU4sRp7vk4vVP1ZCFiejDKiXBNWvs2C"
bech32_address: "zeta1zqlajgj0qr8rqylf2c572t0ux8vqt45d4zngpm"
evm_address: "0x103FD9224F00ce3013e95629e52DFc31D805D68d"
private_key: "dd53f191113d18e57bd4a5494a64a020ba7919c815d0a6d34a42ebb2839e9d95"
base58_private_key: "4yqSQxDeTBvn86BuxcN5jmZb2gaobFXrBqu8kiE9rZxNkVMe3LfXmFigRsU4sRp7vk4vVP1ZCFiejDKiXBNWvs2C"
user_ether:
bech32_address: "zeta134rakuus43xn63yucgxhn88ywj8ewcv6ezn2ga"
evm_address: "0x8D47Db7390AC4D3D449Cc20D799ce4748F97619A"
Expand Down
3 changes: 0 additions & 3 deletions contrib/localnet/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,6 @@ services:
image: solana-local:latest
container_name: solana
hostname: solana
profiles:
- solana
- all
ports:
- "8899:8899"
networks:
Expand Down
7 changes: 4 additions & 3 deletions e2e/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ type Config struct {

// Account contains configuration for an account
type Account struct {
RawBech32Address DoubleQuotedString `yaml:"bech32_address"`
RawEVMAddress DoubleQuotedString `yaml:"evm_address"`
RawPrivateKey DoubleQuotedString `yaml:"private_key"`
RawBech32Address DoubleQuotedString `yaml:"bech32_address"`
RawEVMAddress DoubleQuotedString `yaml:"evm_address"`
RawPrivateKey DoubleQuotedString `yaml:"private_key"`
RawBase58PrivateKey DoubleQuotedString `yaml:"base58_private_key"`
}

// AdditionalAccounts are extra accounts required to run specific tests
Expand Down
7 changes: 4 additions & 3 deletions e2e/e2etests/test_solana_deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func TestSolanaDeposit(r *runner.E2ERunner, _ []string) {
// load deployer private key
privkey := solana.MustPrivateKeyFromBase58(r.Account.RawPrivateKey.String())
privkey := solana.MustPrivateKeyFromBase58(r.Account.RawBase58PrivateKey.String())

// compute the gateway PDA address
pdaComputed := r.ComputePdaAddress()
Expand All @@ -32,8 +32,8 @@ func TestSolanaDeposit(r *runner.E2ERunner, _ []string) {
var err error
inst.DataBytes, err = borsh.Serialize(solanacontract.DepositInstructionParams{
Discriminator: solanacontract.DiscriminatorDeposit(),
Amount: 1338,
Memo: []byte("hello this is a good memo for you to enjoy"),
Amount: 13370000,
Memo: r.EVMAddress().Bytes(),
})
require.NoError(r, err)

Expand All @@ -42,6 +42,7 @@ func TestSolanaDeposit(r *runner.E2ERunner, _ []string) {

// broadcast the transaction and wait for finalization
sig, out := r.BroadcastTxSync(signedTx)
r.Logger.Print("deposit receiver address: %s", r.EVMAddress().String())
r.Logger.Print("deposit logs: %v", out.Meta.LogMessages)

// wait for the cctx to be mined
Expand Down
2 changes: 1 addition & 1 deletion e2e/e2etests/test_solana_initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSolanaInitializeGateway(r *runner.E2ERunner, args []string) {
r.Logger.Print("solana version: %+v", res)

// get deployer account balance
privkey := solana.MustPrivateKeyFromBase58(r.Account.RawPrivateKey.String())
privkey := solana.MustPrivateKeyFromBase58(r.Account.RawBase58PrivateKey.String())
r.Logger.Print("deployer pubkey: %s", privkey.PublicKey().String())
bal, err := client.GetBalance(context.TODO(), privkey.PublicKey(), rpc.CommitmentFinalized)
require.NoError(r, err)
Expand Down
4 changes: 2 additions & 2 deletions e2e/runner/setup_solana.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func (r *E2ERunner) SetupSolanaAccount() {

// SetSolanaAddress imports the deployer's private key
func (r *E2ERunner) SetSolanaAddress() {
privateKey := solana.MustPrivateKeyFromBase58(r.Account.RawPrivateKey.String())
privateKey := solana.MustPrivateKeyFromBase58(r.Account.RawBase58PrivateKey.String())
r.SolanaDeployerAddress = privateKey.PublicKey()

r.Logger.Info("SolanaDeployerAddress: %s", r.BTCDeployerAddress.EncodeAddress())
r.Logger.Info("SolanaDeployerAddress: %s", r.SolanaDeployerAddress)
}
7 changes: 0 additions & 7 deletions e2e/txserver/zeta_tx_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package txserver

import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -205,11 +204,6 @@ func (zts ZetaTxServer) BroadcastTx(account string, msg sdktypes.Msg) (*sdktypes
return nil, err
}

{
tx := txBuilder.GetTx()
fmt.Printf("txBuilder.GetTx(): fee %s, gas %d", tx.GetFee().String(), tx.GetGas())
}

// Sign tx
err = tx.Sign(zts.txFactory, account, txBuilder, true)
if err != nil {
Expand All @@ -223,7 +217,6 @@ func (zts ZetaTxServer) BroadcastTx(account string, msg sdktypes.Msg) (*sdktypes
}

func broadcastWithBlockTimeout(zts ZetaTxServer, txBytes []byte) (*sdktypes.TxResponse, error) {
fmt.Printf("broadcasting tx:\n%s\n", base64.StdEncoding.EncodeToString(txBytes))
res, err := zts.clientCtx.BroadcastTx(txBytes)
if err != nil {
if res == nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/bg/bg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bg
import (
"context"
"fmt"
"runtime"

"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -39,6 +40,7 @@ func Work(ctx context.Context, f func(context.Context) error, opts ...Opt) {
if r := recover(); r != nil {
err := fmt.Errorf("recovered from PANIC in background task: %v", r)
logError(err, cfg)
printStack()
}
}()

Expand All @@ -60,3 +62,16 @@ func logError(err error, cfg config) {

cfg.logger.Error().Err(err).Str("worker.name", name).Msgf("Background task failed")
}

func printStack() {
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, false)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, 2*len(buf))
}
fmt.Printf("Stack trace:\n%s\n", string(buf))
}
21 changes: 2 additions & 19 deletions zetaclient/chains/solana/observer/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package observer

import (
"github.com/pkg/errors"

solanarpc "github.com/zeta-chain/zetacore/zetaclient/chains/solana/rpc"
)

// LoadDB open sql database and load data into Solana observer
Expand All @@ -18,29 +16,14 @@ func (ob *Observer) LoadDB(dbPath string) error {
return errors.Wrapf(err, "error OpenDB for chain %d", ob.Chain().ChainId)
}

// load last scanned tx
err = ob.LoadLastTxScanned()
ob.Observer.LoadLastTxScanned(ob.Logger().Chain)

return err
return nil
}

// LoadLastTxScanned loads the last scanned tx from the database.
func (ob *Observer) LoadLastTxScanned() error {
ob.Observer.LoadLastTxScanned(ob.Logger().Chain)

// when last scanned tx is absent in the database, the observer will scan from the 1st signature for the gateway address.
// this is useful when bootstrapping the Solana observer
if ob.LastTxScanned() == "" {
firstSigature, err := solanarpc.GetFirstSignatureForAddress(
ob.solClient,
ob.gatewayID,
solanarpc.DefaultPageLimit,
)
if err != nil {
return err
}
ob.WithLastTxScanned(firstSigature.String())
}
ob.Logger().Chain.Info().Msgf("chain %d starts scanning from tx %s", ob.Chain().ChainId, ob.LastTxScanned())

return nil
Expand Down
8 changes: 4 additions & 4 deletions zetaclient/chains/solana/observer/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func MockSolanaObserver(
chainParams observertypes.ChainParams,
zetacoreClient interfaces.ZetacoreClient,
tss interfaces.TSSSigner,
dbpath string,
) *observer.Observer {
// use mock zetacore client if not provided
if zetacoreClient == nil {
Expand All @@ -40,6 +41,7 @@ func MockSolanaObserver(
chainParams,
zetacoreClient,
tss,
dbpath,
base.DefaultLogger(),
nil,
)
Expand All @@ -56,8 +58,7 @@ func Test_LoadDB(t *testing.T) {
dbpath := sample.CreateTempDir(t)

// create observer
ob := MockSolanaObserver(t, chain, nil, *params, nil, nil)
ob.OpenDB(dbpath, "")
ob := MockSolanaObserver(t, chain, nil, *params, nil, nil, dbpath)

// write last tx to db
lastTx := sample.SolanaSignature(t).String()
Expand Down Expand Up @@ -87,8 +88,7 @@ func Test_LoadLastTxScanned(t *testing.T) {
dbpath := sample.CreateTempDir(t)

// create observer
ob := MockSolanaObserver(t, chain, nil, *params, nil, nil)
ob.OpenDB(dbpath, "")
ob := MockSolanaObserver(t, chain, nil, *params, nil, nil, dbpath)

t.Run("should load last block scanned", func(t *testing.T) {
// write sample last tx to db
Expand Down
38 changes: 25 additions & 13 deletions zetaclient/chains/solana/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ const (
)

// WatchInbound watches Solana chain for inbounds on a ticker.
// It starts a ticker and run ObserveInbound.
// TODO(revamp): move all ticker related methods in the same file.
func (ob *Observer) WatchInbound(ctx context.Context) error {
app, err := zctx.FromContext(ctx)
if err != nil {
Expand Down Expand Up @@ -59,7 +57,7 @@ func (ob *Observer) WatchInbound(ctx context.Context) error {
Msgf("WatchInbound: inbound observation is disabled for chain %d", ob.Chain().ChainId)
continue
}
err := ob.ObserveInbound(ctx, sampledLogger)
err := ob.ObserveInbound(ctx)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("WatchInbound: observeInbound error")
}
Expand All @@ -71,18 +69,30 @@ func (ob *Observer) WatchInbound(ctx context.Context) error {
}

// ObserveInbound observes the Bitcoin chain for inbounds and post votes to zetacore.
func (ob *Observer) ObserveInbound(ctx context.Context, sampledLogger zerolog.Logger) error {
func (ob *Observer) ObserveInbound(ctx context.Context) error {
chainID := ob.Chain().ChainId
pageLimit := solanarpc.DefaultPageLimit
lastSig := solana.MustSignatureFromBase58(ob.LastTxScanned())

// scan from gateway 1st signature if last scanned tx is absent in the database
// the 1st gateway signature is typically the program initialization
if ob.LastTxScanned() == "" {
lastSig, err := solanarpc.GetFirstSignatureForAddress(ob.solClient, ob.gatewayID, pageLimit)
if err != nil {
return errors.Wrapf(err, "error GetFirstSignatureForAddress for chain %d address %s", chainID, ob.gatewayID)
}
ob.WithLastTxScanned(lastSig.String())
}

// get all signatures for the gateway address since last scanned signature
lastSig := solana.MustSignatureFromBase58(ob.LastTxScanned())
signatures, err := solanarpc.GetSignaturesForAddressUntil(ob.solClient, ob.gatewayID, lastSig, pageLimit)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("error GetSignaturesForAddressUntil")
return err
}
sampledLogger.Info().Msgf("ObserveInbound: got %d signatures for chain %d", len(signatures), chainID)
if len(signatures) > 0 {
ob.Logger().Inbound.Info().Msgf("ObserveInbound: got %d signatures for chain %d", len(signatures), chainID)
}

// loop signature from oldest to latest to filter inbound events
for i := len(signatures) - 1; i >= 0; i-- {
Expand Down Expand Up @@ -112,7 +122,7 @@ func (ob *Observer) ObserveInbound(ctx context.Context, sampledLogger zerolog.Lo
Err(err).
Msgf("ObserveInbound: error saving last sig %s for chain %d", sigString, chainID)
}
sampledLogger.Info().Msgf("ObserveInbound: last scanned sig for chain %d is %s", chainID, sigString)
ob.Logger().Inbound.Info().Msgf("ObserveInbound: last scanned sig for chain %d is %s", chainID, sigString)

// take a rest if max signatures per ticker is reached
if len(signatures)-i >= MaxSignaturesPerTicker {
Expand All @@ -132,11 +142,13 @@ func (ob *Observer) FilterInboundEventAndVote(ctx context.Context, txResult *rpc
}

// build inbound vote message from event and post to zetacore
msg := ob.BuildInboundVoteMsgFromEvent(event)
if msg != nil {
_, err = ob.PostVoteInbound(ctx, msg, zetacore.PostVoteInboundExecutionGasLimit)
if err != nil {
return errors.Wrapf(err, "error PostVoteInbound")
if event != nil {
msg := ob.BuildInboundVoteMsgFromEvent(event)
if msg != nil {
_, err = ob.PostVoteInbound(ctx, msg, zetacore.PostVoteInboundExecutionGasLimit)
if err != nil {
return errors.Wrapf(err, "error PostVoteInbound")
}
}
}

Expand Down Expand Up @@ -237,7 +249,7 @@ func (ob *Observer) ParseInboundAsDeposit(
}

// check if the instruction is a deposit or not
if inst.Discriminator == contract.DiscriminatorDeposit() {
if inst.Discriminator != contract.DiscriminatorDeposit() {
return nil, nil
}

Expand Down
20 changes: 13 additions & 7 deletions zetaclient/chains/solana/observer/inbound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func Test_FilterInboundEventAndVote(t *testing.T) {
chainParams := sample.ChainParams(chain.ChainId)
chainParams.GatewayAddress = "2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s"
zetacoreClient := mocks.NewZetacoreClient(t).WithKeys(&keys.Keys{})
ob, err := observer.NewObserver(chain, nil, *chainParams, zetacoreClient, nil, base.DefaultLogger(), nil)
dbpath := sample.CreateTempDir(t)
ob, err := observer.NewObserver(chain, nil, *chainParams, zetacoreClient, nil, dbpath, base.DefaultLogger(), nil)
require.NoError(t, err)

t.Run("should filter inbound event vote", func(t *testing.T) {
Expand All @@ -53,7 +54,8 @@ func Test_FilterInboundEvent(t *testing.T) {
// create observer
chainParams := sample.ChainParams(chain.ChainId)
chainParams.GatewayAddress = "2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s"
ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, base.DefaultLogger(), nil)
dbpath := sample.CreateTempDir(t)
ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, dbpath, base.DefaultLogger(), nil)
require.NoError(t, err)

// expected result
Expand Down Expand Up @@ -87,7 +89,8 @@ func Test_BuildInboundVoteMsgFromEvent(t *testing.T) {
params := sample.ChainParams(chain.ChainId)
params.GatewayAddress = sample.SolanaAddress(t)
zetacoreClient := mocks.NewZetacoreClient(t).WithKeys(&keys.Keys{})
ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, base.DefaultLogger(), nil)
dbpath := sample.CreateTempDir(t)
ob, err := observer.NewObserver(chain, nil, *params, zetacoreClient, nil, dbpath, base.DefaultLogger(), nil)
require.NoError(t, err)

// create test compliance config
Expand All @@ -97,15 +100,16 @@ func Test_BuildInboundVoteMsgFromEvent(t *testing.T) {

t.Run("should return vote msg for valid event", func(t *testing.T) {
sender := sample.SolanaAddress(t)
event := sample.InboundEvent(chain.ChainId, sender, sender, 1280, []byte("a good memo"))
memo := sample.EthAddress().Bytes()
event := sample.InboundEvent(chain.ChainId, sender, sender, 1280, []byte(memo))

msg := ob.BuildInboundVoteMsgFromEvent(event)
require.NotNil(t, msg)
})
t.Run("should return nil msg if sender is restricted", func(t *testing.T) {
sender := sample.SolanaAddress(t)
receiver := sample.SolanaAddress(t)
event := sample.InboundEvent(chain.ChainId, sender, receiver, 1280, []byte("a good memo"))
event := sample.InboundEvent(chain.ChainId, sender, receiver, 1280, nil)

// restrict sender
cfg.ComplianceConfig.RestrictedAddresses = []string{sender}
Expand All @@ -117,7 +121,8 @@ func Test_BuildInboundVoteMsgFromEvent(t *testing.T) {
t.Run("should return nil msg if receiver is restricted", func(t *testing.T) {
sender := sample.SolanaAddress(t)
receiver := sample.SolanaAddress(t)
event := sample.InboundEvent(chain.ChainId, sender, receiver, 1280, []byte("a good memo"))
memo := sample.EthAddress().Bytes()
event := sample.InboundEvent(chain.ChainId, sender, receiver, 1280, []byte(memo))

// restrict receiver
cfg.ComplianceConfig.RestrictedAddresses = []string{receiver}
Expand Down Expand Up @@ -149,7 +154,8 @@ func Test_ParseInboundAsDeposit(t *testing.T) {
// create observer
chainParams := sample.ChainParams(chain.ChainId)
chainParams.GatewayAddress = "2kJndCL9NBR36ySiQ4bmArs4YgWQu67LmCDfLzk5Gb7s"
ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, base.DefaultLogger(), nil)
dbpath := sample.CreateTempDir(t)
ob, err := observer.NewObserver(chain, nil, *chainParams, nil, nil, dbpath, base.DefaultLogger(), nil)
require.NoError(t, err)

// expected result
Expand Down
Loading

0 comments on commit b3f277e

Please sign in to comment.