Skip to content

Commit

Permalink
parallelize query
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Nov 21, 2023
1 parent c6befda commit de6e159
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 58 deletions.
138 changes: 85 additions & 53 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

// Used by the query cycle to attempt to switch to websocket connection once chains are in sync
var switchToSubscribeErr = errors.New("switching to subscribe")
var errSwitchToSubscribe = errors.New("switching to subscribe")

type CosmosChainProcessor struct {
log *zap.Logger
Expand Down Expand Up @@ -282,7 +282,8 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui

for {
if err := ccp.queryCycle(ctx, &persistence, stuckPacket); err != nil {
if errors.Is(err, switchToSubscribeErr) {
if errors.Is(err, errSwitchToSubscribe) {
ccp.log.Debug("Attempting to switch to websocket mode")
if ccp.chainProvider.cometLegacyBlockResults {
if err := ccp.subscribeLegacy(ctx); err != nil {
ccp.log.Error("Error subscribing to legacy websocket", zap.Error(err))
Expand Down Expand Up @@ -330,6 +331,8 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error {
var latestHeader provider.IBCHeader
var isBlockEvent bool

var eg errgroup.Group

select {
case <-ctx.Done():
return nil
Expand All @@ -339,22 +342,21 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error {

ccp.log.Debug("Received new legacy block event", zap.Int64("height", blockEvent.Block.Height))

// TODO try to avoid this query by getting the SignedHeader via websocket.
// If we can't, add retry logic.
ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height)
if err != nil {
ccp.log.Error("Error querying IBC header",
zap.Int64("height", blockEvent.Block.Height),
zap.Error(err),
)
continue
}
heightUint64 := uint64(blockEvent.Block.Height)

latestHeader = ibcHeader.(provider.TendermintIBCHeader)
eg.Go(func() (err error) {
// TODO try to avoid this query by getting the SignedHeader via websocket.
// If we can't, add retry logic.
ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height)
if err != nil {
return err
}

heightUint64 := uint64(blockEvent.Block.Height)
latestHeader = ibcHeader.(provider.TendermintIBCHeader)
ibcHeaderCache[heightUint64] = latestHeader

ibcHeaderCache[heightUint64] = latestHeader
return nil
})

beginBlockMsgs := ccp.ibcMessagesFromBlockEvents(
chains.ConvertEvents(blockEvent.ResultBeginBlock.Events),
Expand All @@ -378,10 +380,11 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error {
Height: heightUint64,
Time: blockEvent.Block.Time,
}

case event := <-txChan:
txEvent := event.Data.(legacycomettypes.EventDataTx)

ccp.log.Debug("Received new legacy tx event", zap.Int64("height", txEvent.Height))

tx := txEvent.TxResult.Result
if tx.Code != 0 {
// tx was not successful
Expand All @@ -397,27 +400,40 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error {
for _, pp := range ccp.pathProcessors {
clientID := pp.RelevantClientID(chainID)
var clientState provider.ClientState

newData := processor.ChainProcessorCacheData{
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: true,
}

if isBlockEvent {
newData.LatestBlock = ccp.latestBlock
newData.ConnectionStateCache = ccp.connectionStateCache.FilterForClient(clientID)
newData.ChannelStateCache = ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients)

if err := eg.Wait(); err != nil {
ccp.log.Warn(
"Error querying IBC header",
zap.Uint64("height", ccp.latestBlock.Height),
zap.Error(err),
)
} else {
newData.LatestHeader = latestHeader
newData.IBCHeaderCache = ibcHeaderCache.Clone()
}

clientState, err = ccp.clientState(ctx, clientID)
if err != nil {
ccp.log.Error("Error fetching client state",
zap.String("client_id", clientID),
zap.Error(err),
)
continue
} else {
newData.ClientState = clientState
}
}

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: true,
ClientState: clientState,
ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID),
ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients),
IBCHeaderCache: ibcHeaderCache.Clone(),
})
pp.HandleNewData(chainID, newData)
}
}
}
Expand All @@ -439,7 +455,9 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error {
ibcMessagesCache := processor.NewIBCMessagesCache()
ibcHeaderCache := make(processor.IBCHeaderCache)

var latestHeader provider.TendermintIBCHeader
var latestHeader provider.IBCHeader

var eg errgroup.Group

select {
case <-ctx.Done():
Expand All @@ -449,20 +467,21 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error {

ccp.log.Debug("Received new block event", zap.Int64("height", blockEvent.Block.Height))

// TODO try to avoid this query by getting the SignedHeader via websocket.
// If we can't, add retry logic.
ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height)
if err != nil {
ccp.log.Error("Error querying IBC header",
zap.Int64("height", blockEvent.Block.Height),
zap.Error(err),
)
continue
}
heightUint64 := uint64(blockEvent.Block.Height)

latestHeader = ibcHeader.(provider.TendermintIBCHeader)
eg.Go(func() (err error) {
// TODO try to avoid this query by getting the SignedHeader via websocket.
// If we can't, add retry logic.
ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height)
if err != nil {
return err
}

heightUint64 := uint64(blockEvent.Block.Height)
latestHeader = ibcHeader.(provider.TendermintIBCHeader)
ibcHeaderCache[heightUint64] = latestHeader

return nil
})

blockMsgs := ccp.ibcMessagesFromBlockEvents(
blockEvent.ResultFinalizeBlock.Events,
Expand Down Expand Up @@ -495,25 +514,38 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error {

for _, pp := range ccp.pathProcessors {
clientID := pp.RelevantClientID(chainID)
clientState, err := ccp.clientState(ctx, clientID)
var clientState provider.ClientState

newData := processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: true,
ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID),
ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients),
}

if err := eg.Wait(); err != nil {
ccp.log.Warn(
"Error querying IBC header",
zap.Uint64("height", ccp.latestBlock.Height),
zap.Error(err),
)
} else {
newData.LatestHeader = latestHeader
newData.IBCHeaderCache = ibcHeaderCache.Clone()
}

clientState, err = ccp.clientState(ctx, clientID)
if err != nil {
ccp.log.Error("Error fetching client state",
zap.String("client_id", clientID),
zap.Error(err),
)
continue
} else {
newData.ClientState = clientState
}

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
IBCMessagesCache: ibcMessagesCache.Clone(),
InSync: true,
ClientState: clientState,
ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID),
ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients),
IBCHeaderCache: ibcHeaderCache.Clone(),
})
pp.HandleNewData(chainID, newData)
}
}
}
Expand Down Expand Up @@ -602,7 +634,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
// attempt to switchover to websocket
// TODO fixme: can be a few blocks in between when we switch over and when we start processing blocks
// mostly fine because we have the periodic flush, but could be improved.
return switchToSubscribeErr
return errSwitchToSubscribe
} else {
ccp.log.Info("Chain is not yet in sync",
zap.Int64("latest_queried_block", persistence.latestQueriedBlock),
Expand Down
16 changes: 11 additions & 5 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,11 @@ func (pathEnd *pathEndRuntime) checkForMisbehaviour(
}

func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) {
pathEnd.lastClientUpdateHeightMu.Lock()
pathEnd.latestBlock = d.LatestBlock
pathEnd.lastClientUpdateHeightMu.Unlock()
if d.LatestBlock.Height != 0 {
pathEnd.lastClientUpdateHeightMu.Lock()
pathEnd.latestBlock = d.LatestBlock
pathEnd.lastClientUpdateHeightMu.Unlock()
}

pathEnd.inSync = d.InSync
if d.LatestHeader != nil {
Expand Down Expand Up @@ -409,8 +411,12 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func()
return
}

pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain
pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain
if d.ConnectionStateCache != nil {
pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain
}
if d.ChannelStateCache != nil {
pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain
}

pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog

Expand Down

0 comments on commit de6e159

Please sign in to comment.