diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index 6182b486f..b2b4ca7d8 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -151,7 +151,7 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela wg.Add(1) if err := retry.Do(func() error { - return cc.SendMessagesToMempool(ctx, msgs, memo, ctx, []func(*provider.RelayerTxResponse, error){callback}) + return cc.SendMessagesToMempool(ctx, [][]provider.RelayerMessage{msgs}, memo, ctx, []func(*provider.RelayerTxResponse, error){callback}) }, retry.Context(ctx), rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) { cc.log.Info( "Error building or broadcasting transaction", @@ -200,36 +200,52 @@ type jsonrpcMessage struct { // If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. func (cc *CosmosProvider) SendMessagesToMempool( ctx context.Context, - msgs []provider.RelayerMessage, + msgs [][]provider.RelayerMessage, memo string, - asyncCtx context.Context, asyncCallbacks []func(*provider.RelayerTxResponse, error), ) error { - txSignerKey, feegranterKeyOrAddr, err := cc.buildSignerConfig(msgs) - if err != nil { - return err - } + var txf *tx.Factory + var txbs []client.TxBuilder + var sequenceGuard *WalletState + done := cc.SetSDKContext() + dynamicFee := cc.DynamicFee(ctx) + var txSignerKey string + var feegranterKeyOrAddr string + var err error + for _, batch := range msgs { + txSignerKey, feegranterKeyOrAddr, err = cc.buildSignerConfig(batch) + if err != nil { + return err + } - sequenceGuard := ensureSequenceGuard(cc, txSignerKey) - sequenceGuard.Mu.Lock() - defer sequenceGuard.Mu.Unlock() + if sequenceGuard == nil { + sequenceGuard = ensureSequenceGuard(cc, txSignerKey) + sequenceGuard.Mu.Lock() + defer sequenceGuard.Mu.Unlock() + } - dynamicFee := cc.DynamicFee(ctx) - done := cc.SetSDKContext() - txf, txb, err := cc.getTxFactoryAndBuilder(ctx, msgs, memo, 0, txSignerKey, feegranterKeyOrAddr, sequenceGuard, dynamicFee) - if err != nil { - done() - return err + f, txb, err := cc.getTxFactoryAndBuilder(ctx, batch, memo, 0, txSignerKey, feegranterKeyOrAddr, sequenceGuard, dynamicFee) + if err != nil { + done() + return err + } + if txf == nil { + txf = f + } else { + f := txf.WithGas(txf.Gas() + f.Gas()) + txf = &f + } + txbs = append(txbs, txb) } var txBytes []byte var sequence uint64 var fees sdk.Coins if len(cc.PCfg.PrecompiledContractAddress) > 0 { - txBytes, sequence, fees, err = cc.buildEvmMessages(ctx, txf, txb) + txBytes, sequence, fees, err = cc.buildEvmMessages(ctx, txf, txbs) } else { - txBytes, sequence, fees, err = cc.buildMessages(ctx, txf, txb, txSignerKey) + txBytes, sequence, fees, err = cc.buildMessages(ctx, txf, txbs[0], txSignerKey) } done() if err != nil { @@ -244,7 +260,7 @@ func (cc *CosmosProvider) SendMessagesToMempool( err = cc.broadcastTx( ctx, txBytes, - msgs, + msgs[0], //TODO fees, asyncCtx, defaultBroadcastWaitTimeout, @@ -868,7 +884,7 @@ func extractSigners(msg proto.Message) []sdk.AccAddress { func (cc *CosmosProvider) buildEvmMessages( ctx context.Context, txf *tx.Factory, - txb client.TxBuilder, + txbs []client.TxBuilder, ) ([]byte, uint64, sdk.Coins, error) { chainID, err := ethermintcodecs.ParseChainID(cc.PCfg.ChainID) if err != nil { @@ -895,56 +911,62 @@ func (cc *CosmosProvider) buildEvmMessages( contractAddress := common.HexToAddress(cc.PCfg.PrecompiledContractAddress) blockNumber := new(big.Int) - msgs := txb.GetTx().GetMsgs() - signers := extractSigners(msgs[0]) - if len(signers) != 1 { - return nil, 0, sdk.Coins{}, sdkerrors.Wrapf(legacyerrors.ErrUnknownRequest, "invalid signers length %d", len(signers)) - } - from, err := convertAddress(signers[0].String()) - if err != nil { - return nil, 0, sdk.Coins{}, err - } - data, err := cc.getPayloads(contractAddress, msgs) - if err != nil { - return nil, 0, sdk.Coins{}, err - } - nonce := txf.Sequence() - amount := big.NewInt(0) - tx := evmtypes.NewTx( - chainID, nonce, &contractAddress, amount, - gasLimit, gasPrice, gasFeeCap, gasTipCap, - data, ðtypes.AccessList{}) - tx.From = from.Bytes() - if err := tx.ValidateBasic(); err != nil { - cc.log.Info("tx failed basic validation", zap.Error(err)) - return nil, 0, sdk.Coins{}, err - } - if err := retry.Do(func() error { - signer := ethtypes.MakeSigner(getChainConfig(chainID), blockNumber) - if err := tx.Sign(signer, cc.Keybase); err != nil { - return err + options := make([]*types.Any, 0) + txs := make([]proto.Message, 0) + for _, txb := range txbs { + msgs := txb.GetTx().GetMsgs() + signers := extractSigners(msgs[0]) + if len(signers) != 1 { + return nil, 0, sdk.Coins{}, sdkerrors.Wrapf(legacyerrors.ErrUnknownRequest, "invalid signers length %d", len(signers)) } - return nil - }, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil { - return nil, 0, sdk.Coins{}, err - } - if feeAmt := sdkmath.NewIntFromBigInt(tx.GetFee()); feeAmt.Sign() > 0 { - fees = fees.Add(sdk.NewCoin(gasPriceCoin.Denom, feeAmt)) + from, err := convertAddress(signers[0].String()) + if err != nil { + return nil, 0, sdk.Coins{}, err + } + data, err := cc.getPayloads(contractAddress, msgs) + if err != nil { + return nil, 0, sdk.Coins{}, err + } + nonce := txf.Sequence() + amount := big.NewInt(0) + tx := evmtypes.NewTx( + chainID, nonce, &contractAddress, amount, + gasLimit, gasPrice, gasFeeCap, gasTipCap, + data, ðtypes.AccessList{}) + tx.From = from.Bytes() + if err := tx.ValidateBasic(); err != nil { + cc.log.Info("tx failed basic validation", zap.Error(err)) + return nil, 0, sdk.Coins{}, err + } + if err := retry.Do(func() error { + signer := ethtypes.MakeSigner(getChainConfig(chainID), blockNumber) + if err := tx.Sign(signer, cc.Keybase); err != nil { + return err + } + return nil + }, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil { + return nil, 0, sdk.Coins{}, err + } + if feeAmt := sdkmath.NewIntFromBigInt(tx.GetFee()); feeAmt.Sign() > 0 { + fees = fees.Add(sdk.NewCoin(gasPriceCoin.Denom, feeAmt)) + } + txGasLimit += tx.GetGas() + cc.log.Info("append", zap.String("hash", tx.Hash().String())) + txs = append(txs, tx) + + option, err := types.NewAnyWithValue(new(evmtypes.ExtensionOptionsEthereumTx)) + if err != nil { + return nil, 0, sdk.Coins{}, err + } + options = append(options, option) } - txGasLimit += tx.GetGas() - cc.log.Info("append", zap.String("hash", tx.Hash().String())) - builder, ok := txb.(authtx.ExtensionOptionsTxBuilder) + builder, ok := txbs[0].(authtx.ExtensionOptionsTxBuilder) if !ok { return nil, 0, sdk.Coins{}, errors.New("unsupported builder") } - - option, err := types.NewAnyWithValue(new(evmtypes.ExtensionOptionsEthereumTx)) - if err != nil { - return nil, 0, sdk.Coins{}, err - } - builder.SetExtensionOptions(option) - builder.SetMsgs(tx) + builder.SetExtensionOptions(options...) + builder.SetMsgs(txs...) builder.SetFeeAmount(fees) builder.SetGasLimit(txGasLimit) txBytes, err := cc.Cdc.TxConfig.TxEncoder()(builder.GetTx()) diff --git a/relayer/chains/penumbra/tx.go b/relayer/chains/penumbra/tx.go index b923a4fb2..ba43e62d3 100644 --- a/relayer/chains/penumbra/tx.go +++ b/relayer/chains/penumbra/tx.go @@ -2283,8 +2283,14 @@ func (cc *PenumbraProvider) MsgSubmitQueryResponse(chainID string, queryID provi panic("implement me") } -func (cc *PenumbraProvider) SendMessagesToMempool(ctx context.Context, msgs []provider.RelayerMessage, memo string, asyncCtx context.Context, asyncCallback []func(*provider.RelayerTxResponse, error)) error { - sendRsp, err := cc.sendMessagesInner(ctx, msgs, memo) +func (cc *PenumbraProvider) SendMessagesToMempool(ctx context.Context, msgs [][]provider.RelayerMessage, memo string, asyncCtx context.Context, asyncCallback []func(*provider.RelayerTxResponse, error)) error { + allMsgs := make([]provider.RelayerMessage, 0) + for _, batch := range msgs { + for _, msg := range batch { + allMsgs = append(allMsgs, msg) + } + } + sendRsp, err := cc.sendMessagesInner(ctx, allMsgs, memo) cc.log.Debug("Received response from sending messages", zap.Any("response", sendRsp), zap.Error(err)) return err } diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 436faa5b8..10a395747 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -334,7 +334,7 @@ func (mp *messageProcessor) trackAndSendMessages( if maxMsgNum < 2 && maxMsgNum != 0 { maxMsgNum = 2 } - var batch []messageToTrack + var batches [][]messageToTrack for _, t := range mp.trackers() { @@ -350,18 +350,19 @@ func (mp *messageProcessor) trackAndSendMessages( } if broadcastBatch && (retries == 0 || ordered) { - batch = append(batch, t) - if len(batch) >= int(maxMsgNum) && maxMsgNum != 0 { - go mp.sendBatchMessages(ctx, src, dst, batch) - batch = nil + batchCount := len(batches) + if batchCount == 0 || maxMsgNum != 0 && len(batches[batchCount-1]) >= int(maxMsgNum) { + batches = append(batches, make([]messageToTrack, 0)) + batchCount += 1 } + batches[batchCount-1] = append(batches[batchCount-1], t) continue } go mp.sendSingleMessage(ctx, src, dst, t) } - if len(batch) > 0 { - go mp.sendBatchMessages(ctx, src, dst, batch) + if len(batches) > 0 { + go mp.sendBatchMessages(ctx, src, dst, batches) } if mp.assembledCount() > 0 { @@ -393,7 +394,7 @@ func (mp *messageProcessor) sendClientUpdate( msgs := []provider.RelayerMessage{mp.msgUpdateClient} - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil { + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, [][]provider.RelayerMessage{msgs}, mp.memo, ctx, nil); err != nil { mp.log.Error("Error sending client update message", zap.String("path_name", src.info.PathName), zap.String("src_chain_id", src.info.ChainID), @@ -423,57 +424,66 @@ var PathProcMessageCollector chan *PathProcessorMessageResp func (mp *messageProcessor) sendBatchMessages( ctx context.Context, src, dst *pathEndRuntime, - batch []messageToTrack, + batches [][]messageToTrack, ) { broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() var ( - msgs []provider.RelayerMessage - fields []zapcore.Field + batchedMsgs [][]provider.RelayerMessage + fields []zapcore.Field ) - if mp.isLocalhost { - msgs = make([]provider.RelayerMessage, len(batch)) - for i, t := range batch { - msgs[i] = t.assembledMsg() - fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) - } - } else { - // messages are batch with appended MsgUpdateClient - msgs = make([]provider.RelayerMessage, 1+len(batch)) - msgs[0] = mp.msgUpdateClient + batchedMsgs = make([][]provider.RelayerMessage, len(batches)) + for i, batch := range batches { + var msgs []provider.RelayerMessage + if mp.isLocalhost { + msgs = make([]provider.RelayerMessage, len(batch)) + for i, t := range batch { + msgs[i] = t.assembledMsg() + fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + } + } else { + // messages are batch with appended MsgUpdateClient + msgs = make([]provider.RelayerMessage, 1+len(batch)) + msgs[0] = mp.msgUpdateClient - for i, t := range batch { - msgs[i+1] = t.assembledMsg() - fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + for i, t := range batch { + msgs[i+1] = t.assembledMsg() + fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + } } + batchedMsgs[i] = msgs } dst.log.Debug("Will relay messages", fields...) callback := func(_ *provider.RelayerTxResponse, err error) { - for _, t := range batch { - dst.finishedProcessing <- t + for _, batch := range batches { + for _, t := range batch { + dst.finishedProcessing <- t + } } // only increment metrics counts for successful packets if err != nil || mp.metrics == nil { return } - for _, tracker := range batch { - t, ok := tracker.(packetMessageToTrack) - if !ok { - continue + for _, batch := range batches { + for _, tracker := range batch { + t, ok := tracker.(packetMessageToTrack) + if !ok { + continue + } + var channel, port string + if t.msg.eventType == chantypes.EventTypeRecvPacket { + channel = t.msg.info.DestChannel + port = t.msg.info.DestPort + } else { + channel = t.msg.info.SourceChannel + port = t.msg.info.SourcePort + } + mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) } - var channel, port string - if t.msg.eventType == chantypes.EventTypeRecvPacket { - channel = t.msg.info.DestChannel - port = t.msg.info.DestPort - } else { - channel = t.msg.info.SourceChannel - port = t.msg.info.SourcePort - } - mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) } } callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback} @@ -492,9 +502,11 @@ func (mp *messageProcessor) sendBatchMessages( callbacks = append(callbacks, testCallback) } - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks); err != nil { - for _, t := range batch { - dst.finishedProcessing <- t + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, batchedMsgs, mp.memo, ctx, callbacks); err != nil { + for _, batch := range batches { + for _, t := range batch { + dst.finishedProcessing <- t + } } errFields := []zapcore.Field{ zap.String("path_name", src.info.PathName), @@ -579,7 +591,7 @@ func (mp *messageProcessor) sendSingleMessage( callbacks = append(callbacks, testCallback) } - err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks) + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, [][]provider.RelayerMessage{msgs}, mp.memo, ctx, callbacks) if err != nil { dst.finishedProcessing <- tracker errFields := []zapcore.Field{ diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index f4376954c..3fadb7a0b 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -389,7 +389,7 @@ type ChainProvider interface { SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error) SendMessagesToMempool( ctx context.Context, - msgs []RelayerMessage, + msgs [][]RelayerMessage, memo string, asyncCtx context.Context,