Skip to content

Commit

Permalink
tm websocket first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Nov 20, 2023
1 parent 49b3b2e commit a895911
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 19 deletions.
163 changes: 163 additions & 0 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cosmos/relayer/v2/relayer/provider"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
comettypes "github.com/cometbft/cometbft/types"
"github.com/cosmos/relayer/v2/relayer/chains"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -272,6 +273,10 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui

ccp.log.Debug("Entering main query loop")

if err := ccp.subscribe(ctx); err != nil {
ccp.log.Warn("Error subscribing to websocket, falling back to rpc polling", zap.Error(err))
}

ticker := time.NewTicker(persistence.minQueryLoopDuration)
defer ticker.Stop()

Expand All @@ -288,6 +293,164 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
}
}

func (ccp CosmosChainProcessor) periodicTMVersionCheck(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
status, err := ccp.nodeStatusWithRetry(ctx)
if err != nil {
ccp.log.Error(
"Failed to query node status after max attempts",
zap.Uint("attempts", latestHeightQueryRetries),
zap.Error(err),
)
continue
}

ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version)
}
}

}

func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error {

// latestHeight, err := ccp.latestHeightWithRetry(ctx)
// if err != nil {
// return err
// }
// ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, latestHeight)
// if err != nil {
// return err
// }

// valSet := ibcHeader.(provider.TendermintIBCHeader).ValidatorSet

headerChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "header", comettypes.QueryForEvent(comettypes.EventNewBlockHeader).String())
if err != nil {
return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err)
}

txChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "tx", comettypes.QueryForEvent(comettypes.EventTx).String())
if err != nil {
return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err)
}

// periodically check the tendermint version of the node
go ccp.periodicTMVersionCheck(ctx)

// valSetChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "vals", comettypes.QueryForEvent(comettypes.EventValidatorSetUpdates).String())
// if err != nil {
// return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err)
// }

chainID := ccp.chainProvider.ChainId()

for {
ibcMessagesCache := processor.NewIBCMessagesCache()
ibcHeaderCache := make(processor.IBCHeaderCache)

var latestHeader provider.TendermintIBCHeader

select {
case <-ctx.Done():
return nil
case event := <-headerChan:
headerEvent := event.Data.(comettypes.EventDataNewBlockHeader)

ccp.log.Debug("Received new block header event", zap.Int64("height", headerEvent.Header.Height))

// TODO try to avoid this query by getting the SignedHeader via websocket.
// If we can't, add retry logic.
ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, int64(headerEvent.Header.Height))
if err != nil {
ccp.log.Error("Error querying IBC header",
zap.Int64("height", headerEvent.Header.Height),
zap.Error(err),
)
continue
}

latestHeader = ibcHeader.(provider.TendermintIBCHeader)

heightUint64 := uint64(headerEvent.Header.Height)

blockMsgs := ccp.ibcMessagesFromBlockEvents(
headerEvent.ResultBeginBlock.Events,

Check failure on line 383 in relayer/chains/cosmos/cosmos_chain_processor.go

View workflow job for this annotation

GitHub Actions / build

headerEvent.ResultBeginBlock undefined (type "github.com/cometbft/cometbft/types".EventDataNewBlockHeader has no field or method ResultBeginBlock)
headerEvent.ResultEndBlock.Events,

Check failure on line 384 in relayer/chains/cosmos/cosmos_chain_processor.go

View workflow job for this annotation

GitHub Actions / build

headerEvent.ResultEndBlock undefined (type "github.com/cometbft/cometbft/types".EventDataNewBlockHeader has no field or method ResultEndBlock)
heightUint64,
ccp.chainProvider.cometLegacyEncoding,

Check failure on line 386 in relayer/chains/cosmos/cosmos_chain_processor.go

View workflow job for this annotation

GitHub Actions / build

too many arguments in call to ccp.ibcMessagesFromBlockEvents
)
for _, m := range blockMsgs {
ccp.handleMessage(ctx, m, ibcMessagesCache)
}

ccp.latestBlock = provider.LatestBlock{
Height: heightUint64,
Time: headerEvent.Header.Time,
}

// latestHeader = provider.TendermintIBCHeader{
// ValidatorSet: valSet,
// SignedHeader: &comettypes.SignedHeader{
// Header: &headerEvent.Header,
// // TODO: how do we get Commit without querying light block?
// // Commit: ,
// },
// }

ibcHeaderCache[heightUint64] = latestHeader
case event := <-txChan:
ccp.log.Debug("Received new tx event")

txEvent := event.Data.(comettypes.EventDataTx)

tx := txEvent.Result
if tx.Code != 0 {
// tx was not successful
continue
}
messages := chains.IbcMessagesFromEvents(ccp.log, tx.Events, chainID, uint64(txEvent.Height), ccp.chainProvider.cometLegacyEncoding)

for _, m := range messages {
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
// case event := <-valSetChan:
// ccp.log.Debug("Received validator set change event")
// vsEvent := event.Data.(comettypes.EventDataValidatorSetUpdates)
// valSet.UpdateWithChangeSet(vsEvent.ValidatorUpdates)
// continue
}

for _, pp := range ccp.pathProcessors {
clientID := pp.RelevantClientID(chainID)
clientState, err := ccp.clientState(ctx, clientID)
if err != nil {
ccp.log.Error("Error fetching client state",
zap.String("client_id", clientID),
zap.Error(err),
)
continue
}

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: ccp.inSync,
ClientState: clientState,
ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID),
ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients),
IBCHeaderCache: ibcHeaderCache.Clone(),
})
}
}
}

// initializeConnectionState will bootstrap the connectionStateCache with the open connection state.
func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, queryStateTimeout)
Expand Down
32 changes: 24 additions & 8 deletions relayer/chains/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,30 @@ type CosmosProvider struct {
}

type WalletState struct {
NextAccountSequence uint64
Mu sync.Mutex
nextAccountSequence uint64
mu sync.RWMutex
}

func (ws *WalletState) UpdateNextAccountSequence(seq uint64) {
ws.mu.Lock()
defer ws.mu.Unlock()
if seq > ws.nextAccountSequence {
ws.nextAccountSequence = seq
}
}

func (ws *WalletState) SetNextAccountSequence(seq uint64) {
ws.mu.Lock()
defer ws.mu.Unlock()
if seq > ws.nextAccountSequence {
ws.nextAccountSequence = seq
}
}

func (ws *WalletState) NextAccountSequence() uint64 {
ws.mu.RLock()
defer ws.mu.RUnlock()
return ws.nextAccountSequence
}

func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig {
Expand Down Expand Up @@ -349,12 +371,6 @@ func (cc *CosmosProvider) SetMetrics(m *processor.PrometheusMetrics) {
cc.metrics = m
}

func (cc *CosmosProvider) updateNextAccountSequence(sequenceGuard *WalletState, seq uint64) {
if seq > sequenceGuard.NextAccountSequence {
sequenceGuard.NextAccountSequence = seq
}
}

func (cc *CosmosProvider) setCometVersion(log *zap.Logger, version string) {
cc.cometLegacyEncoding = cc.legacyEncodedEvents(log, version)
}
Expand Down
15 changes: 7 additions & 8 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,6 @@ func (cc *CosmosProvider) SendMessagesToMempool(
}

sequenceGuard := ensureSequenceGuard(cc, txSignerKey)
sequenceGuard.Mu.Lock()
defer sequenceGuard.Mu.Unlock()

txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo, 0, txSignerKey, feegranterKey, sequenceGuard)
if err != nil {
Expand All @@ -188,8 +186,8 @@ func (cc *CosmosProvider) SendMessagesToMempool(
return err
}

// we had a successful tx broadcast with this sequence, so update it to the next
cc.updateNextAccountSequence(sequenceGuard, sequence+1)
sequenceGuard.UpdateNextAccountSequence(sequence + 1)

return nil
}

Expand Down Expand Up @@ -615,9 +613,10 @@ func (cc *CosmosProvider) buildMessages(
}

sequence = txf.Sequence()
cc.updateNextAccountSequence(sequenceGuard, sequence)
if sequence < sequenceGuard.NextAccountSequence {
sequence = sequenceGuard.NextAccountSequence
sequenceGuard.UpdateNextAccountSequence(sequence)
nas := sequenceGuard.NextAccountSequence()
if sequence < nas {
sequence = nas
txf = txf.WithSequence(sequence)
}

Expand Down Expand Up @@ -682,7 +681,7 @@ func (cc *CosmosProvider) handleAccountSequenceMismatchError(sequenceGuard *Wall
if err != nil {
return
}
sequenceGuard.NextAccountSequence = nextSeq
sequenceGuard.SetNextAccountSequence(nextSeq)
}

// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
Expand Down
10 changes: 7 additions & 3 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func()
pathEnd.lastClientUpdateHeightMu.Unlock()

pathEnd.inSync = d.InSync
pathEnd.latestHeader = d.LatestHeader
if d.LatestHeader != nil {
pathEnd.latestHeader = d.LatestHeader
}
pathEnd.clientState = d.ClientState

terminate, err := pathEnd.checkForMisbehaviour(ctx, pathEnd.clientState, counterParty)
Expand Down Expand Up @@ -407,8 +409,10 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func()

pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog

pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state
pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers
if d.IBCHeaderCache != nil {
pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state
pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers
}
}

// shouldSendPacketMessage determines if the packet flow message should be sent now.
Expand Down

0 comments on commit a895911

Please sign in to comment.