Skip to content

Commit

Permalink
fix: speed up Solana withdraw stress test in two go routines (#3295)
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie authored Dec 13, 2024
1 parent f4349a3 commit e4955b7
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 29 deletions.
9 changes: 9 additions & 0 deletions cmd/zetae2e/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,15 @@ func localE2ETest(cmd *cobra.Command, _ []string) {
verbose,
conf.AdditionalAccounts.UserSolana,
e2etests.TestStressSolanaWithdrawName,
),
)
eg.Go(
solanaWithdrawPerformanceRoutine(
conf,
"perf_spl_withdraw",
deployerRunner,
verbose,
conf.AdditionalAccounts.UserSPL,
e2etests.TestStressSPLWithdrawName,
),
)
Expand Down
8 changes: 1 addition & 7 deletions zetaclient/chains/solana/signer/outbound_tracker_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ import (
"github.com/zeta-chain/node/zetaclient/logs"
)

const (
// SolanaTransactionTimeout is the timeout for waiting for an outbound to be confirmed
// Transaction referencing a blockhash older than 150 blocks will expire and be rejected by Solana.
SolanaTransactionTimeout = 2 * time.Minute
)

// reportToOutboundTracker launch a go routine with timeout to check for tx confirmation;
// it reports tx to outbound tracker only if it's confirmed by the Solana network.
func (signer *Signer) reportToOutboundTracker(
Expand Down Expand Up @@ -56,7 +50,7 @@ func (signer *Signer) reportToOutboundTracker(
time.Sleep(5 * time.Second)

// give up if we know the tx is too old and already expired
if time.Since(start) > SolanaTransactionTimeout {
if time.Since(start) > solanaTransactionTimeout {
logger.Info().Msg("outbound is expired")
return nil
}
Expand Down
95 changes: 77 additions & 18 deletions zetaclient/chains/solana/signer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"strings"
"time"

"cosmossdk.io/errors"
ethcommon "github.com/ethereum/go-ethereum/common"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/token"
"github.com/gagliardetto/solana-go/rpc"
"github.com/near/borsh-go"
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/chains"
Expand All @@ -22,10 +24,24 @@ import (
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/compliance"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/logs"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/outboundprocessor"
)

const (
// solanaTransactionTimeout is the timeout for waiting for an outbound to be confirmed.
// Transaction referencing a blockhash older than 150 blocks (60 ~90 secs) will expire and be rejected by Solana.
solanaTransactionTimeout = 2 * time.Minute

// broadcastBackoff is the initial backoff duration for retrying broadcast
broadcastBackoff = 1 * time.Second

// broadcastRetries is the maximum number of retries for broadcasting a transaction
// 6 retries will span over 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds, good enough for the 2 minute timeout
broadcastRetries = 7
)

var _ interfaces.ChainSigner = (*Signer)(nil)

// Signer deals with signing Solana transactions and implements the ChainSigner interface
Expand Down Expand Up @@ -170,26 +186,69 @@ func (signer *Signer) TryProcessOutbound(
// set relayer balance metrics
signer.SetRelayerBalanceMetrics(ctx)

// broadcast the signed tx to the Solana network with preflight check
txSig, err := signer.client.SendTransactionWithOpts(
ctx,
tx,
// Commitment "finalized" is too conservative for preflight check and
// it results in repeated broadcast attempts that only 1 will succeed.
// Commitment "processed" will simulate tx against more recent state
// thus fails faster once a tx is already broadcasted and processed by the cluster.
// This reduces the number of "failed" txs due to repeated broadcast attempts.
rpc.TransactionOpts{PreflightCommitment: rpc.CommitmentProcessed},
)
if err != nil {
logger.Error().
Err(err).
Msgf("TryProcessOutbound: broadcast error")
return
// broadcast the signed tx to the Solana network
signer.broadcastOutbound(ctx, tx, chainID, nonce, logger, zetacoreClient)
}

// broadcastOutbound sends the signed transaction to the Solana network
func (signer *Signer) broadcastOutbound(
ctx context.Context,
tx *solana.Transaction,
chainID int64,
nonce uint64,
logger zerolog.Logger,
zetacoreClient interfaces.ZetacoreClient,
) {
// prepare logger fields
lf := map[string]any{
logs.FieldMethod: "broadcastOutbound",
logs.FieldNonce: nonce,
logs.FieldTx: tx.Signatures[0].String(),
}

// report the outbound to the outbound tracker
signer.reportToOutboundTracker(ctx, zetacoreClient, chainID, nonce, txSig, logger)
// try broacasting tx with increasing backoff (1s, 2s, 4s, 8s, 16s, 32s, 64s)
// to tolerate tx nonce mismatch with PDA nonce or unknown RPC error
backOff := broadcastBackoff
for i := 0; i < broadcastRetries; i++ {
time.Sleep(backOff)

// PDA nonce may already be increased by other relayer, no need to retry
pdaInfo, err := signer.client.GetAccountInfo(ctx, signer.pda)
if err != nil {
logger.Error().Err(err).Fields(lf).Msgf("unable to get PDA account info")
} else {
pda := contracts.PdaInfo{}
err = borsh.Deserialize(&pda, pdaInfo.Bytes())
if err != nil {
logger.Error().Err(err).Fields(lf).Msgf("unable to deserialize PDA info")
} else if pda.Nonce > nonce {
logger.Info().Err(err).Fields(lf).Msgf("PDA nonce %d is greater than outbound nonce, stop retrying", pda.Nonce)
break
}
}

// broadcast the signed tx to the Solana network with preflight check
txSig, err := signer.client.SendTransactionWithOpts(
ctx,
tx,
// Commitment "finalized" is too conservative for preflight check and
// it results in repeated broadcast attempts that only 1 will succeed.
// Commitment "processed" will simulate tx against more recent state
// thus fails faster once a tx is already broadcasted and processed by the cluster.
// This reduces the number of "failed" txs due to repeated broadcast attempts.
rpc.TransactionOpts{PreflightCommitment: rpc.CommitmentProcessed},
)
if err != nil {
logger.Warn().Err(err).Fields(lf).Msgf("SendTransactionWithOpts failed")
backOff *= 2
continue
}
logger.Info().Fields(lf).Msgf("broadcasted Solana outbound successfully")

// successful broadcast; report to the outbound tracker
signer.reportToOutboundTracker(ctx, zetacoreClient, chainID, nonce, txSig, logger)
break
}
}

func (signer *Signer) prepareWithdrawTx(
Expand Down
16 changes: 12 additions & 4 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
)

const (
// evmOutboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0,
// the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0)
// NOTE: 1.0 means look back the same number of cctxs as we look ahead
evmOutboundLookbackFactor = 1.0
outboundLookbackFactor = 1.0

// sampling rate for sampled orchestrator logger
loggerSamplingRate = 10
Expand Down Expand Up @@ -428,7 +428,7 @@ func (oc *Orchestrator) ScheduleCctxEVM(
}
outboundScheduleLookahead := observer.ChainParams().OutboundScheduleLookahead
// #nosec G115 always in range
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * evmOutboundLookbackFactor)
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * outboundLookbackFactor)
// #nosec G115 positive
outboundScheduleInterval := uint64(observer.ChainParams().OutboundScheduleInterval)
criticalInterval := uint64(10) // for critical pending outbound we reduce re-try interval
Expand Down Expand Up @@ -596,8 +596,11 @@ func (oc *Orchestrator) ScheduleCctxSolana(
oc.logger.Error().Msgf("ScheduleCctxSolana: chain observer is not a solana observer")
return
}
// #nosec G115 positive

// outbound keysign scheduler parameters
interval := uint64(observer.ChainParams().OutboundScheduleInterval)
outboundScheduleLookahead := observer.ChainParams().OutboundScheduleLookahead
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * outboundLookbackFactor)

// schedule keysign for each pending cctx
for _, cctx := range cctxList {
Expand All @@ -610,6 +613,11 @@ func (oc *Orchestrator) ScheduleCctxSolana(
Msgf("ScheduleCctxSolana: outbound %s chainid mismatch: want %d, got %d", outboundID, chainID, params.ReceiverChainId)
continue
}
if params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+outboundScheduleLookback {
oc.logger.Warn().Msgf("ScheduleCctxSolana: nonce too high: signing %d, earliest pending %d",
params.TssNonce, cctxList[0].GetCurrentOutboundParam().TssNonce)
break
}

// vote outbound if it's already confirmed
continueKeysign, err := solObserver.VoteOutboundIfConfirmed(ctx, cctx)
Expand Down

0 comments on commit e4955b7

Please sign in to comment.