Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: speed up Solana withdraw stress test in two go routines #3295

Merged
merged 2 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/zetae2e/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,15 @@ func localE2ETest(cmd *cobra.Command, _ []string) {
verbose,
conf.AdditionalAccounts.UserSolana,
e2etests.TestStressSolanaWithdrawName,
),
)
eg.Go(
solanaWithdrawPerformanceRoutine(
conf,
"perf_spl_withdraw",
deployerRunner,
verbose,
conf.AdditionalAccounts.UserSPL,
e2etests.TestStressSPLWithdrawName,
),
)
Expand Down
8 changes: 1 addition & 7 deletions zetaclient/chains/solana/signer/outbound_tracker_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ import (
"github.com/zeta-chain/node/zetaclient/logs"
)

const (
// SolanaTransactionTimeout is the timeout for waiting for an outbound to be confirmed
// Transaction referencing a blockhash older than 150 blocks will expire and be rejected by Solana.
SolanaTransactionTimeout = 2 * time.Minute
)

// reportToOutboundTracker launch a go routine with timeout to check for tx confirmation;
// it reports tx to outbound tracker only if it's confirmed by the Solana network.
func (signer *Signer) reportToOutboundTracker(
Expand Down Expand Up @@ -56,7 +50,7 @@ func (signer *Signer) reportToOutboundTracker(
time.Sleep(5 * time.Second)

// give up if we know the tx is too old and already expired
if time.Since(start) > SolanaTransactionTimeout {
if time.Since(start) > solanaTransactionTimeout {
logger.Info().Msg("outbound is expired")
return nil
}
Expand Down
95 changes: 77 additions & 18 deletions zetaclient/chains/solana/signer/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"strings"
"time"

"cosmossdk.io/errors"
ethcommon "github.com/ethereum/go-ethereum/common"
bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/programs/token"
"github.com/gagliardetto/solana-go/rpc"
"github.com/near/borsh-go"
"github.com/rs/zerolog"

"github.com/zeta-chain/node/pkg/chains"
Expand All @@ -22,10 +24,24 @@ import (
"github.com/zeta-chain/node/zetaclient/chains/interfaces"
"github.com/zeta-chain/node/zetaclient/compliance"
"github.com/zeta-chain/node/zetaclient/keys"
"github.com/zeta-chain/node/zetaclient/logs"
"github.com/zeta-chain/node/zetaclient/metrics"
"github.com/zeta-chain/node/zetaclient/outboundprocessor"
)

const (
// solanaTransactionTimeout is the timeout for waiting for an outbound to be confirmed.
// Transaction referencing a blockhash older than 150 blocks (60 ~90 secs) will expire and be rejected by Solana.
solanaTransactionTimeout = 2 * time.Minute

// broadcastBackoff is the initial backoff duration for retrying broadcast
broadcastBackoff = 1 * time.Second

// broadcastRetries is the maximum number of retries for broadcasting a transaction
// 6 retries will span over 1 + 2 + 4 + 8 + 16 + 32 + 64 = 127 seconds, good enough for the 2 minute timeout
broadcastRetries = 7
)

var _ interfaces.ChainSigner = (*Signer)(nil)

// Signer deals with signing Solana transactions and implements the ChainSigner interface
Expand Down Expand Up @@ -170,26 +186,69 @@ func (signer *Signer) TryProcessOutbound(
// set relayer balance metrics
signer.SetRelayerBalanceMetrics(ctx)

// broadcast the signed tx to the Solana network with preflight check
txSig, err := signer.client.SendTransactionWithOpts(
ctx,
tx,
// Commitment "finalized" is too conservative for preflight check and
// it results in repeated broadcast attempts that only 1 will succeed.
// Commitment "processed" will simulate tx against more recent state
// thus fails faster once a tx is already broadcasted and processed by the cluster.
// This reduces the number of "failed" txs due to repeated broadcast attempts.
rpc.TransactionOpts{PreflightCommitment: rpc.CommitmentProcessed},
)
if err != nil {
logger.Error().
Err(err).
Msgf("TryProcessOutbound: broadcast error")
return
// broadcast the signed tx to the Solana network
signer.broadcastOutbound(ctx, tx, chainID, nonce, logger, zetacoreClient)
}

// broadcastOutbound sends the signed transaction to the Solana network
func (signer *Signer) broadcastOutbound(
ctx context.Context,
tx *solana.Transaction,
chainID int64,
nonce uint64,
logger zerolog.Logger,
zetacoreClient interfaces.ZetacoreClient,
) {
// prepare logger fields
lf := map[string]any{
logs.FieldMethod: "broadcastOutbound",
logs.FieldNonce: nonce,
logs.FieldTx: tx.Signatures[0].String(),
}

// report the outbound to the outbound tracker
signer.reportToOutboundTracker(ctx, zetacoreClient, chainID, nonce, txSig, logger)
// try broacasting tx with increasing backoff (1s, 2s, 4s, 8s, 16s, 32s, 64s)
// to tolerate tx nonce mismatch with PDA nonce or unknown RPC error
backOff := broadcastBackoff
for i := 0; i < broadcastRetries; i++ {
time.Sleep(backOff)

// PDA nonce may already be increased by other relayer, no need to retry
pdaInfo, err := signer.client.GetAccountInfo(ctx, signer.pda)
if err != nil {
logger.Error().Err(err).Fields(lf).Msgf("unable to get PDA account info")
} else {
pda := contracts.PdaInfo{}
err = borsh.Deserialize(&pda, pdaInfo.Bytes())
if err != nil {
logger.Error().Err(err).Fields(lf).Msgf("unable to deserialize PDA info")
} else if pda.Nonce > nonce {
logger.Info().Err(err).Fields(lf).Msgf("PDA nonce %d is greater than outbound nonce, stop retrying", pda.Nonce)
break
}
}

// broadcast the signed tx to the Solana network with preflight check
txSig, err := signer.client.SendTransactionWithOpts(
ctx,
tx,
// Commitment "finalized" is too conservative for preflight check and
// it results in repeated broadcast attempts that only 1 will succeed.
// Commitment "processed" will simulate tx against more recent state
// thus fails faster once a tx is already broadcasted and processed by the cluster.
// This reduces the number of "failed" txs due to repeated broadcast attempts.
rpc.TransactionOpts{PreflightCommitment: rpc.CommitmentProcessed},
)
if err != nil {
logger.Warn().Err(err).Fields(lf).Msgf("SendTransactionWithOpts failed")
backOff *= 2
continue
}
logger.Info().Fields(lf).Msgf("broadcasted Solana outbound successfully")

// successful broadcast; report to the outbound tracker
signer.reportToOutboundTracker(ctx, zetacoreClient, chainID, nonce, txSig, logger)
break
}
}

func (signer *Signer) prepareWithdrawTx(
Expand Down
16 changes: 12 additions & 4 deletions zetaclient/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
)

const (
// evmOutboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// outboundLookbackFactor is the factor to determine how many nonces to look back for pending cctxs
// For example, give OutboundScheduleLookahead of 120, pending NonceLow of 1000 and factor of 1.0,
// the scheduler need to be able to pick up and schedule any pending cctx with nonce < 880 (1000 - 120 * 1.0)
// NOTE: 1.0 means look back the same number of cctxs as we look ahead
evmOutboundLookbackFactor = 1.0
outboundLookbackFactor = 1.0

// sampling rate for sampled orchestrator logger
loggerSamplingRate = 10
Expand Down Expand Up @@ -428,7 +428,7 @@ func (oc *Orchestrator) ScheduleCctxEVM(
}
outboundScheduleLookahead := observer.ChainParams().OutboundScheduleLookahead
// #nosec G115 always in range
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * evmOutboundLookbackFactor)
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * outboundLookbackFactor)
// #nosec G115 positive
outboundScheduleInterval := uint64(observer.ChainParams().OutboundScheduleInterval)
criticalInterval := uint64(10) // for critical pending outbound we reduce re-try interval
Expand Down Expand Up @@ -596,8 +596,11 @@ func (oc *Orchestrator) ScheduleCctxSolana(
oc.logger.Error().Msgf("ScheduleCctxSolana: chain observer is not a solana observer")
return
}
// #nosec G115 positive

// outbound keysign scheduler parameters
interval := uint64(observer.ChainParams().OutboundScheduleInterval)
outboundScheduleLookahead := observer.ChainParams().OutboundScheduleLookahead
outboundScheduleLookback := uint64(float64(outboundScheduleLookahead) * outboundLookbackFactor)

// schedule keysign for each pending cctx
for _, cctx := range cctxList {
Expand All @@ -610,6 +613,11 @@ func (oc *Orchestrator) ScheduleCctxSolana(
Msgf("ScheduleCctxSolana: outbound %s chainid mismatch: want %d, got %d", outboundID, chainID, params.ReceiverChainId)
continue
}
if params.TssNonce > cctxList[0].GetCurrentOutboundParam().TssNonce+outboundScheduleLookback {
oc.logger.Warn().Msgf("ScheduleCctxSolana: nonce too high: signing %d, earliest pending %d",
params.TssNonce, cctxList[0].GetCurrentOutboundParam().TssNonce)
break
}

// vote outbound if it's already confirmed
continueKeysign, err := solObserver.VoteOutboundIfConfirmed(ctx, cctx)
Expand Down
Loading