From de6e15966d053aec76c835b3b32d45e68cdbab28 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 20 Nov 2023 23:33:50 -0700 Subject: [PATCH] parallelize query --- .../chains/cosmos/cosmos_chain_processor.go | 138 +++++++++++------- relayer/processor/path_end_runtime.go | 16 +- 2 files changed, 96 insertions(+), 58 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 9ac8e934a..fe2a52a90 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -26,7 +26,7 @@ import ( ) // Used by the query cycle to attempt to switch to websocket connection once chains are in sync -var switchToSubscribeErr = errors.New("switching to subscribe") +var errSwitchToSubscribe = errors.New("switching to subscribe") type CosmosChainProcessor struct { log *zap.Logger @@ -282,7 +282,8 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui for { if err := ccp.queryCycle(ctx, &persistence, stuckPacket); err != nil { - if errors.Is(err, switchToSubscribeErr) { + if errors.Is(err, errSwitchToSubscribe) { + ccp.log.Debug("Attempting to switch to websocket mode") if ccp.chainProvider.cometLegacyBlockResults { if err := ccp.subscribeLegacy(ctx); err != nil { ccp.log.Error("Error subscribing to legacy websocket", zap.Error(err)) @@ -330,6 +331,8 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error { var latestHeader provider.IBCHeader var isBlockEvent bool + var eg errgroup.Group + select { case <-ctx.Done(): return nil @@ -339,22 +342,21 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error { ccp.log.Debug("Received new legacy block event", zap.Int64("height", blockEvent.Block.Height)) - // TODO try to avoid this query by getting the SignedHeader via websocket. - // If we can't, add retry logic. - ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height) - if err != nil { - ccp.log.Error("Error querying IBC header", - zap.Int64("height", blockEvent.Block.Height), - zap.Error(err), - ) - continue - } + heightUint64 := uint64(blockEvent.Block.Height) - latestHeader = ibcHeader.(provider.TendermintIBCHeader) + eg.Go(func() (err error) { + // TODO try to avoid this query by getting the SignedHeader via websocket. + // If we can't, add retry logic. + ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height) + if err != nil { + return err + } - heightUint64 := uint64(blockEvent.Block.Height) + latestHeader = ibcHeader.(provider.TendermintIBCHeader) + ibcHeaderCache[heightUint64] = latestHeader - ibcHeaderCache[heightUint64] = latestHeader + return nil + }) beginBlockMsgs := ccp.ibcMessagesFromBlockEvents( chains.ConvertEvents(blockEvent.ResultBeginBlock.Events), @@ -378,10 +380,11 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error { Height: heightUint64, Time: blockEvent.Block.Time, } - case event := <-txChan: txEvent := event.Data.(legacycomettypes.EventDataTx) + ccp.log.Debug("Received new legacy tx event", zap.Int64("height", txEvent.Height)) + tx := txEvent.TxResult.Result if tx.Code != 0 { // tx was not successful @@ -397,27 +400,40 @@ func (ccp *CosmosChainProcessor) subscribeLegacy(ctx context.Context) error { for _, pp := range ccp.pathProcessors { clientID := pp.RelevantClientID(chainID) var clientState provider.ClientState + + newData := processor.ChainProcessorCacheData{ + IBCMessagesCache: ibcMessagesCache.Clone(), + InSync: true, + } + if isBlockEvent { + newData.LatestBlock = ccp.latestBlock + newData.ConnectionStateCache = ccp.connectionStateCache.FilterForClient(clientID) + newData.ChannelStateCache = ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients) + + if err := eg.Wait(); err != nil { + ccp.log.Warn( + "Error querying IBC header", + zap.Uint64("height", ccp.latestBlock.Height), + zap.Error(err), + ) + } else { + newData.LatestHeader = latestHeader + newData.IBCHeaderCache = ibcHeaderCache.Clone() + } + clientState, err = ccp.clientState(ctx, clientID) if err != nil { ccp.log.Error("Error fetching client state", zap.String("client_id", clientID), zap.Error(err), ) - continue + } else { + newData.ClientState = clientState } } - pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ - LatestBlock: ccp.latestBlock, - LatestHeader: latestHeader, - IBCMessagesCache: ibcMessagesCache.Clone(), - InSync: true, - ClientState: clientState, - ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID), - ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients), - IBCHeaderCache: ibcHeaderCache.Clone(), - }) + pp.HandleNewData(chainID, newData) } } } @@ -439,7 +455,9 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error { ibcMessagesCache := processor.NewIBCMessagesCache() ibcHeaderCache := make(processor.IBCHeaderCache) - var latestHeader provider.TendermintIBCHeader + var latestHeader provider.IBCHeader + + var eg errgroup.Group select { case <-ctx.Done(): @@ -449,20 +467,21 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error { ccp.log.Debug("Received new block event", zap.Int64("height", blockEvent.Block.Height)) - // TODO try to avoid this query by getting the SignedHeader via websocket. - // If we can't, add retry logic. - ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height) - if err != nil { - ccp.log.Error("Error querying IBC header", - zap.Int64("height", blockEvent.Block.Height), - zap.Error(err), - ) - continue - } + heightUint64 := uint64(blockEvent.Block.Height) - latestHeader = ibcHeader.(provider.TendermintIBCHeader) + eg.Go(func() (err error) { + // TODO try to avoid this query by getting the SignedHeader via websocket. + // If we can't, add retry logic. + ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, blockEvent.Block.Height) + if err != nil { + return err + } - heightUint64 := uint64(blockEvent.Block.Height) + latestHeader = ibcHeader.(provider.TendermintIBCHeader) + ibcHeaderCache[heightUint64] = latestHeader + + return nil + }) blockMsgs := ccp.ibcMessagesFromBlockEvents( blockEvent.ResultFinalizeBlock.Events, @@ -495,25 +514,38 @@ func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error { for _, pp := range ccp.pathProcessors { clientID := pp.RelevantClientID(chainID) - clientState, err := ccp.clientState(ctx, clientID) + var clientState provider.ClientState + + newData := processor.ChainProcessorCacheData{ + LatestBlock: ccp.latestBlock, + IBCMessagesCache: ibcMessagesCache.Clone(), + InSync: true, + ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID), + ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients), + } + + if err := eg.Wait(); err != nil { + ccp.log.Warn( + "Error querying IBC header", + zap.Uint64("height", ccp.latestBlock.Height), + zap.Error(err), + ) + } else { + newData.LatestHeader = latestHeader + newData.IBCHeaderCache = ibcHeaderCache.Clone() + } + + clientState, err = ccp.clientState(ctx, clientID) if err != nil { ccp.log.Error("Error fetching client state", zap.String("client_id", clientID), zap.Error(err), ) - continue + } else { + newData.ClientState = clientState } - pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ - LatestBlock: ccp.latestBlock, - LatestHeader: latestHeader, - IBCMessagesCache: ibcMessagesCache.Clone(), - InSync: true, - ClientState: clientState, - ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID), - ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients), - IBCHeaderCache: ibcHeaderCache.Clone(), - }) + pp.HandleNewData(chainID, newData) } } } @@ -602,7 +634,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu // attempt to switchover to websocket // TODO fixme: can be a few blocks in between when we switch over and when we start processing blocks // mostly fine because we have the periodic flush, but could be improved. - return switchToSubscribeErr + return errSwitchToSubscribe } else { ccp.log.Info("Chain is not yet in sync", zap.Int64("latest_queried_block", persistence.latestQueriedBlock), diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 0b3a7da65..aa3853c17 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -370,9 +370,11 @@ func (pathEnd *pathEndRuntime) checkForMisbehaviour( } func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) { - pathEnd.lastClientUpdateHeightMu.Lock() - pathEnd.latestBlock = d.LatestBlock - pathEnd.lastClientUpdateHeightMu.Unlock() + if d.LatestBlock.Height != 0 { + pathEnd.lastClientUpdateHeightMu.Lock() + pathEnd.latestBlock = d.LatestBlock + pathEnd.lastClientUpdateHeightMu.Unlock() + } pathEnd.inSync = d.InSync if d.LatestHeader != nil { @@ -409,8 +411,12 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() return } - pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain - pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain + if d.ConnectionStateCache != nil { + pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain + } + if d.ChannelStateCache != nil { + pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain + } pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog