Skip to content

Commit

Permalink
Problem: packed batch is not supported
Browse files Browse the repository at this point in the history
  • Loading branch information
mmsqe committed Jun 17, 2024
1 parent 63b64b0 commit 0e29383
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 110 deletions.
150 changes: 86 additions & 64 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela
wg.Add(1)

if err := retry.Do(func() error {
return cc.SendMessagesToMempool(ctx, msgs, memo, ctx, []func(*provider.RelayerTxResponse, error){callback})
return cc.SendMessagesToMempool(ctx, [][]provider.RelayerMessage{msgs}, memo, ctx, []func(*provider.RelayerTxResponse, error){callback})
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr, retry.OnRetry(func(n uint, err error) {
cc.log.Info(
"Error building or broadcasting transaction",
Expand Down Expand Up @@ -200,36 +200,52 @@ type jsonrpcMessage struct {
// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion.
func (cc *CosmosProvider) SendMessagesToMempool(
ctx context.Context,
msgs []provider.RelayerMessage,
msgs [][]provider.RelayerMessage,
memo string,

asyncCtx context.Context,
asyncCallbacks []func(*provider.RelayerTxResponse, error),
) error {
txSignerKey, feegranterKeyOrAddr, err := cc.buildSignerConfig(msgs)
if err != nil {
return err
}
var txf *tx.Factory
var txbs []client.TxBuilder
var sequenceGuard *WalletState
done := cc.SetSDKContext()
dynamicFee := cc.DynamicFee(ctx)
var txSignerKey string
var feegranterKeyOrAddr string
var err error
for _, batch := range msgs {
txSignerKey, feegranterKeyOrAddr, err = cc.buildSignerConfig(batch)
if err != nil {
return err
}

sequenceGuard := ensureSequenceGuard(cc, txSignerKey)
sequenceGuard.Mu.Lock()
defer sequenceGuard.Mu.Unlock()
if sequenceGuard == nil {
sequenceGuard = ensureSequenceGuard(cc, txSignerKey)
sequenceGuard.Mu.Lock()
defer sequenceGuard.Mu.Unlock()
}

dynamicFee := cc.DynamicFee(ctx)
done := cc.SetSDKContext()
txf, txb, err := cc.getTxFactoryAndBuilder(ctx, msgs, memo, 0, txSignerKey, feegranterKeyOrAddr, sequenceGuard, dynamicFee)
if err != nil {
done()
return err
f, txb, err := cc.getTxFactoryAndBuilder(ctx, batch, memo, 0, txSignerKey, feegranterKeyOrAddr, sequenceGuard, dynamicFee)
if err != nil {
done()
return err
}
if txf == nil {
txf = f
} else {
f := txf.WithGas(txf.Gas() + f.Gas())
txf = &f
}
txbs = append(txbs, txb)
}

var txBytes []byte
var sequence uint64
var fees sdk.Coins
if len(cc.PCfg.PrecompiledContractAddress) > 0 {
txBytes, sequence, fees, err = cc.buildEvmMessages(ctx, txf, txb)
txBytes, sequence, fees, err = cc.buildEvmMessages(ctx, txf, txbs)
} else {
txBytes, sequence, fees, err = cc.buildMessages(ctx, txf, txb, txSignerKey)
txBytes, sequence, fees, err = cc.buildMessages(ctx, txf, txbs[0], txSignerKey)
}
done()
if err != nil {
Expand All @@ -244,7 +260,7 @@ func (cc *CosmosProvider) SendMessagesToMempool(
err = cc.broadcastTx(
ctx,
txBytes,
msgs,
msgs[0], //TODO
fees,
asyncCtx,
defaultBroadcastWaitTimeout,
Expand Down Expand Up @@ -868,7 +884,7 @@ func extractSigners(msg proto.Message) []sdk.AccAddress {
func (cc *CosmosProvider) buildEvmMessages(
ctx context.Context,
txf *tx.Factory,
txb client.TxBuilder,
txbs []client.TxBuilder,
) ([]byte, uint64, sdk.Coins, error) {
chainID, err := ethermintcodecs.ParseChainID(cc.PCfg.ChainID)
if err != nil {
Expand All @@ -895,56 +911,62 @@ func (cc *CosmosProvider) buildEvmMessages(
contractAddress := common.HexToAddress(cc.PCfg.PrecompiledContractAddress)
blockNumber := new(big.Int)

msgs := txb.GetTx().GetMsgs()
signers := extractSigners(msgs[0])
if len(signers) != 1 {
return nil, 0, sdk.Coins{}, sdkerrors.Wrapf(legacyerrors.ErrUnknownRequest, "invalid signers length %d", len(signers))
}
from, err := convertAddress(signers[0].String())
if err != nil {
return nil, 0, sdk.Coins{}, err
}
data, err := cc.getPayloads(contractAddress, msgs)
if err != nil {
return nil, 0, sdk.Coins{}, err
}
nonce := txf.Sequence()
amount := big.NewInt(0)
tx := evmtypes.NewTx(
chainID, nonce, &contractAddress, amount,
gasLimit, gasPrice, gasFeeCap, gasTipCap,
data, &ethtypes.AccessList{})
tx.From = from.Bytes()
if err := tx.ValidateBasic(); err != nil {
cc.log.Info("tx failed basic validation", zap.Error(err))
return nil, 0, sdk.Coins{}, err
}
if err := retry.Do(func() error {
signer := ethtypes.MakeSigner(getChainConfig(chainID), blockNumber)
if err := tx.Sign(signer, cc.Keybase); err != nil {
return err
options := make([]*types.Any, 0)
txs := make([]proto.Message, 0)
for _, txb := range txbs {
msgs := txb.GetTx().GetMsgs()
signers := extractSigners(msgs[0])
if len(signers) != 1 {
return nil, 0, sdk.Coins{}, sdkerrors.Wrapf(legacyerrors.ErrUnknownRequest, "invalid signers length %d", len(signers))
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, 0, sdk.Coins{}, err
}
if feeAmt := sdkmath.NewIntFromBigInt(tx.GetFee()); feeAmt.Sign() > 0 {
fees = fees.Add(sdk.NewCoin(gasPriceCoin.Denom, feeAmt))
from, err := convertAddress(signers[0].String())
if err != nil {
return nil, 0, sdk.Coins{}, err
}
data, err := cc.getPayloads(contractAddress, msgs)
if err != nil {
return nil, 0, sdk.Coins{}, err
}
nonce := txf.Sequence()
amount := big.NewInt(0)
tx := evmtypes.NewTx(
chainID, nonce, &contractAddress, amount,
gasLimit, gasPrice, gasFeeCap, gasTipCap,
data, &ethtypes.AccessList{})
tx.From = from.Bytes()
if err := tx.ValidateBasic(); err != nil {
cc.log.Info("tx failed basic validation", zap.Error(err))
return nil, 0, sdk.Coins{}, err
}
if err := retry.Do(func() error {
signer := ethtypes.MakeSigner(getChainConfig(chainID), blockNumber)
if err := tx.Sign(signer, cc.Keybase); err != nil {
return err
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, 0, sdk.Coins{}, err
}
if feeAmt := sdkmath.NewIntFromBigInt(tx.GetFee()); feeAmt.Sign() > 0 {
fees = fees.Add(sdk.NewCoin(gasPriceCoin.Denom, feeAmt))
}
txGasLimit += tx.GetGas()
cc.log.Info("append", zap.String("hash", tx.Hash().String()))
txs = append(txs, tx)

option, err := types.NewAnyWithValue(new(evmtypes.ExtensionOptionsEthereumTx))
if err != nil {
return nil, 0, sdk.Coins{}, err
}
options = append(options, option)
}
txGasLimit += tx.GetGas()
cc.log.Info("append", zap.String("hash", tx.Hash().String()))

builder, ok := txb.(authtx.ExtensionOptionsTxBuilder)
builder, ok := txbs[0].(authtx.ExtensionOptionsTxBuilder)
if !ok {
return nil, 0, sdk.Coins{}, errors.New("unsupported builder")
}

option, err := types.NewAnyWithValue(new(evmtypes.ExtensionOptionsEthereumTx))
if err != nil {
return nil, 0, sdk.Coins{}, err
}
builder.SetExtensionOptions(option)
builder.SetMsgs(tx)
builder.SetExtensionOptions(options...)
builder.SetMsgs(txs...)
builder.SetFeeAmount(fees)
builder.SetGasLimit(txGasLimit)
txBytes, err := cc.Cdc.TxConfig.TxEncoder()(builder.GetTx())
Expand Down
10 changes: 8 additions & 2 deletions relayer/chains/penumbra/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2283,8 +2283,14 @@ func (cc *PenumbraProvider) MsgSubmitQueryResponse(chainID string, queryID provi
panic("implement me")
}

func (cc *PenumbraProvider) SendMessagesToMempool(ctx context.Context, msgs []provider.RelayerMessage, memo string, asyncCtx context.Context, asyncCallback []func(*provider.RelayerTxResponse, error)) error {
sendRsp, err := cc.sendMessagesInner(ctx, msgs, memo)
func (cc *PenumbraProvider) SendMessagesToMempool(ctx context.Context, msgs [][]provider.RelayerMessage, memo string, asyncCtx context.Context, asyncCallback []func(*provider.RelayerTxResponse, error)) error {
allMsgs := make([]provider.RelayerMessage, 0)
for _, batch := range msgs {
for _, msg := range batch {
allMsgs = append(allMsgs, msg)
}
}
sendRsp, err := cc.sendMessagesInner(ctx, allMsgs, memo)
cc.log.Debug("Received response from sending messages", zap.Any("response", sendRsp), zap.Error(err))
return err
}
Expand Down
98 changes: 55 additions & 43 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (mp *messageProcessor) trackAndSendMessages(
if maxMsgNum < 2 && maxMsgNum != 0 {
maxMsgNum = 2
}
var batch []messageToTrack
var batches [][]messageToTrack

for _, t := range mp.trackers() {

Expand All @@ -350,18 +350,19 @@ func (mp *messageProcessor) trackAndSendMessages(
}

if broadcastBatch && (retries == 0 || ordered) {
batch = append(batch, t)
if len(batch) >= int(maxMsgNum) && maxMsgNum != 0 {
go mp.sendBatchMessages(ctx, src, dst, batch)
batch = nil
batchCount := len(batches)
if batchCount == 0 || maxMsgNum != 0 && len(batches[batchCount-1]) >= int(maxMsgNum) {
batches = append(batches, make([]messageToTrack, 0))
batchCount += 1
}
batches[batchCount-1] = append(batches[batchCount-1], t)
continue
}
go mp.sendSingleMessage(ctx, src, dst, t)
}

if len(batch) > 0 {
go mp.sendBatchMessages(ctx, src, dst, batch)
if len(batches) > 0 {
go mp.sendBatchMessages(ctx, src, dst, batches)
}

if mp.assembledCount() > 0 {
Expand Down Expand Up @@ -393,7 +394,7 @@ func (mp *messageProcessor) sendClientUpdate(

msgs := []provider.RelayerMessage{mp.msgUpdateClient}

if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil {
if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, [][]provider.RelayerMessage{msgs}, mp.memo, ctx, nil); err != nil {
mp.log.Error("Error sending client update message",
zap.String("path_name", src.info.PathName),
zap.String("src_chain_id", src.info.ChainID),
Expand Down Expand Up @@ -423,57 +424,66 @@ var PathProcMessageCollector chan *PathProcessorMessageResp
func (mp *messageProcessor) sendBatchMessages(
ctx context.Context,
src, dst *pathEndRuntime,
batch []messageToTrack,
batches [][]messageToTrack,
) {
broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout)
defer cancel()

var (
msgs []provider.RelayerMessage
fields []zapcore.Field
batchedMsgs [][]provider.RelayerMessage
fields []zapcore.Field
)

if mp.isLocalhost {
msgs = make([]provider.RelayerMessage, len(batch))
for i, t := range batch {
msgs[i] = t.assembledMsg()
fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t))
}
} else {
// messages are batch with appended MsgUpdateClient
msgs = make([]provider.RelayerMessage, 1+len(batch))
msgs[0] = mp.msgUpdateClient
batchedMsgs = make([][]provider.RelayerMessage, len(batches))
for i, batch := range batches {
var msgs []provider.RelayerMessage
if mp.isLocalhost {
msgs = make([]provider.RelayerMessage, len(batch))
for i, t := range batch {
msgs[i] = t.assembledMsg()
fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t))
}
} else {
// messages are batch with appended MsgUpdateClient
msgs = make([]provider.RelayerMessage, 1+len(batch))
msgs[0] = mp.msgUpdateClient

for i, t := range batch {
msgs[i+1] = t.assembledMsg()
fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t))
for i, t := range batch {
msgs[i+1] = t.assembledMsg()
fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t))
}
}
batchedMsgs[i] = msgs
}

dst.log.Debug("Will relay messages", fields...)

callback := func(_ *provider.RelayerTxResponse, err error) {
for _, t := range batch {
dst.finishedProcessing <- t
for _, batch := range batches {
for _, t := range batch {
dst.finishedProcessing <- t
}
}
// only increment metrics counts for successful packets
if err != nil || mp.metrics == nil {
return
}
for _, tracker := range batch {
t, ok := tracker.(packetMessageToTrack)
if !ok {
continue
for _, batch := range batches {
for _, tracker := range batch {
t, ok := tracker.(packetMessageToTrack)
if !ok {
continue
}
var channel, port string
if t.msg.eventType == chantypes.EventTypeRecvPacket {
channel = t.msg.info.DestChannel
port = t.msg.info.DestPort
} else {
channel = t.msg.info.SourceChannel
port = t.msg.info.SourcePort
}
mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType)
}
var channel, port string
if t.msg.eventType == chantypes.EventTypeRecvPacket {
channel = t.msg.info.DestChannel
port = t.msg.info.DestPort
} else {
channel = t.msg.info.SourceChannel
port = t.msg.info.SourcePort
}
mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType)
}
}
callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback}
Expand All @@ -492,9 +502,11 @@ func (mp *messageProcessor) sendBatchMessages(
callbacks = append(callbacks, testCallback)
}

if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks); err != nil {
for _, t := range batch {
dst.finishedProcessing <- t
if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, batchedMsgs, mp.memo, ctx, callbacks); err != nil {
for _, batch := range batches {
for _, t := range batch {
dst.finishedProcessing <- t
}
}
errFields := []zapcore.Field{
zap.String("path_name", src.info.PathName),
Expand Down Expand Up @@ -579,7 +591,7 @@ func (mp *messageProcessor) sendSingleMessage(
callbacks = append(callbacks, testCallback)
}

err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks)
err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, [][]provider.RelayerMessage{msgs}, mp.memo, ctx, callbacks)
if err != nil {
dst.finishedProcessing <- tracker
errFields := []zapcore.Field{
Expand Down
2 changes: 1 addition & 1 deletion relayer/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ type ChainProvider interface {
SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error)
SendMessagesToMempool(
ctx context.Context,
msgs []RelayerMessage,
msgs [][]RelayerMessage,
memo string,

asyncCtx context.Context,
Expand Down

0 comments on commit 0e29383

Please sign in to comment.