diff --git a/changelog.md b/changelog.md index da05d7b9b4..1fa1f6d681 100644 --- a/changelog.md +++ b/changelog.md @@ -67,6 +67,8 @@ * [1883](https://github.com/zeta-chain/node/issues/1883) - zetaclient should check 'IsSupported' flag to pause/unpause a specific chain * [1633](https://github.com/zeta-chain/node/issues/1633) - zetaclient should be able to pick up new connector and erc20Custody addresses * [1944](https://github.com/zeta-chain/node/pull/1944) - fix evm signer unit tests +* [1888](https://github.com/zeta-chain/node/issues/1888) - zetaclient should stop inbound/outbound txs according to cross-chain flags +* [1970](https://github.com/zeta-chain/node/issues/1970) - remove the timeout in the evm outtx tracker processing thread ### Chores diff --git a/zetaclient/bitcoin/bitcoin_client.go b/zetaclient/bitcoin/bitcoin_client.go index 51bc1ab758..110c89c474 100644 --- a/zetaclient/bitcoin/bitcoin_client.go +++ b/zetaclient/bitcoin/bitcoin_client.go @@ -330,10 +330,12 @@ func (ob *BTCChainClient) WatchInTx() { defer ticker.Stop() ob.logger.InTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.InTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.ObserveInTx() @@ -384,12 +386,6 @@ func (ob *BTCChainClient) postBlockHeader(tip int64) error { } func (ob *BTCChainClient) ObserveInTx() error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height cnt, err := ob.rpcClient.GetBlockCount() if err != nil { @@ -438,6 +434,9 @@ func (ob *BTCChainClient) ObserveInTx() error { } // add block header to zetabridge + // TODO: consider having a separate ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled { err = ob.postBlockHeader(bn) if err != nil { @@ -583,11 +582,20 @@ func (ob *BTCChainClient) IsSendOutTxProcessed(cctx *types.CrossChainTx, logger // WatchGasPrice watches Bitcoin chain for gas rate and post to zetacore func (ob *BTCChainClient) WatchGasPrice() { + // report gas price right away as the ticker takes time to kick in + err := ob.PostGasPrice() + if err != nil { + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) + } + + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.GasPrice.Error().Err(err).Msg("error creating ticker") return } + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -1122,10 +1130,13 @@ func (ob *BTCChainClient) WatchOutTx() { } defer ticker.Stop() + ob.logger.OutTx.Info().Msgf("WatchInTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) diff --git a/zetaclient/bitcoin/bitcoin_client_test.go b/zetaclient/bitcoin/bitcoin_client_test.go index 58377cdc65..dfa576c9c1 100644 --- a/zetaclient/bitcoin/bitcoin_client_test.go +++ b/zetaclient/bitcoin/bitcoin_client_test.go @@ -712,12 +712,3 @@ func TestGetBtcEventErrors(t *testing.T) { require.Nil(t, event) }) } - -func TestBTCChainClient_ObserveInTx(t *testing.T) { - t.Run("should return error", func(t *testing.T) { - // create mainnet mock client - btcClient := MockBTCClientMainnet() - err := btcClient.ObserveInTx() - require.ErrorContains(t, err, "inbound TXS / Send has been disabled by the protocol") - }) -} diff --git a/zetaclient/bitcoin/inbound_tracker.go b/zetaclient/bitcoin/inbound_tracker.go index b948a6c979..e0dbd595cb 100644 --- a/zetaclient/bitcoin/inbound_tracker.go +++ b/zetaclient/bitcoin/inbound_tracker.go @@ -6,6 +6,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/zeta-chain/zetacore/pkg/coin" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" "github.com/zeta-chain/zetacore/zetaclient/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" ) @@ -22,7 +23,7 @@ func (ob *BTCChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveTrackerSuggestions() diff --git a/zetaclient/core_context/zeta_core_context.go b/zetaclient/core_context/zeta_core_context.go index d35c502c86..346563be04 100644 --- a/zetaclient/core_context/zeta_core_context.go +++ b/zetaclient/core_context/zeta_core_context.go @@ -179,3 +179,15 @@ func (c *ZetaCoreContext) Update( c.currentTssPubkey = tssPubKey } } + +// IsOutboundObservationEnabled returns true if the chain is supported and outbound flag is enabled +func IsOutboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsOutboundEnabled +} + +// IsInboundObservationEnabled returns true if the chain is supported and inbound flag is enabled +func IsInboundObservationEnabled(c *ZetaCoreContext, chainParams observertypes.ChainParams) bool { + flags := c.GetCrossChainFlags() + return chainParams.IsSupported && flags.IsInboundEnabled +} diff --git a/zetaclient/core_context/zeta_core_context_test.go b/zetaclient/core_context/zeta_core_context_test.go index 14f36a1cc2..8745fdc95c 100644 --- a/zetaclient/core_context/zeta_core_context_test.go +++ b/zetaclient/core_context/zeta_core_context_test.go @@ -3,6 +3,7 @@ package corecontext_test import ( "testing" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/testutil/sample" @@ -12,6 +13,44 @@ import ( corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" ) +func assertPanic(t *testing.T, f func(), errorLog string) { + defer func() { + r := recover() + if r != nil { + require.Contains(t, r, errorLog) + } + }() + f() +} + +func getTestCoreContext( + evmChain chains.Chain, + evmChainParams *observertypes.ChainParams, + ccFlags observertypes.CrosschainFlags) *corecontext.ZetaCoreContext { + // create config + cfg := config.NewConfig() + cfg.EVMChainConfigs[evmChain.ChainId] = config.EVMConfig{ + Chain: evmChain, + } + // create core context + coreContext := corecontext.NewZetaCoreContext(cfg) + evmChainParamsMap := make(map[int64]*observertypes.ChainParams) + evmChainParamsMap[evmChain.ChainId] = evmChainParams + + // feed chain params + coreContext.Update( + &observertypes.Keygen{}, + []chains.Chain{evmChain}, + evmChainParamsMap, + nil, + "", + ccFlags, + true, + zerolog.Logger{}, + ) + return coreContext +} + func TestNewZetaCoreContext(t *testing.T) { t.Run("should create new zeta core context with empty config", func(t *testing.T) { testCfg := config.NewConfig() @@ -264,12 +303,60 @@ func TestUpdateZetaCoreContext(t *testing.T) { }) } -func assertPanic(t *testing.T, f func(), errorLog string) { - defer func() { - r := recover() - if r != nil { - require.Contains(t, r, errorLog) +func TestIsOutboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and outbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsOutboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, } - }() - f() + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if outbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsOutboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsOutboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) +} + +func TestIsInboundObservationEnabled(t *testing.T) { + // create test chain params and flags + evmChain := chains.EthChain() + ccFlags := *sample.CrosschainFlags() + chainParams := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: true, + } + + t.Run("should return true if chain is supported and inbound flag is enabled", func(t *testing.T) { + coreCTX := getTestCoreContext(evmChain, chainParams, ccFlags) + require.True(t, corecontext.IsInboundObservationEnabled(coreCTX, *chainParams)) + }) + t.Run("should return false if chain is not supported yet", func(t *testing.T) { + paramsUnsupported := &observertypes.ChainParams{ + ChainId: evmChain.ChainId, + IsSupported: false, + } + coreCTXUnsupported := getTestCoreContext(evmChain, paramsUnsupported, ccFlags) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXUnsupported, *paramsUnsupported)) + }) + t.Run("should return false if inbound flag is disabled", func(t *testing.T) { + flagsDisabled := ccFlags + flagsDisabled.IsInboundEnabled = false + coreCTXDisabled := getTestCoreContext(evmChain, chainParams, flagsDisabled) + require.False(t, corecontext.IsInboundObservationEnabled(coreCTXDisabled, *chainParams)) + }) } diff --git a/zetaclient/evm/evm_client.go b/zetaclient/evm/evm_client.go index f94536be25..450176df90 100644 --- a/zetaclient/evm/evm_client.go +++ b/zetaclient/evm/evm_client.go @@ -613,61 +613,48 @@ func (ob *ChainClient) IsSendOutTxProcessed(cctx *crosschaintypes.CrossChainTx, // WatchOutTx watches evm chain for outgoing txs status func (ob *ChainClient) WatchOutTx() { - // read env variables if set - timeoutNonce, err := strconv.Atoi(os.Getenv("OS_TIMEOUT_NONCE")) - if err != nil || timeoutNonce <= 0 { - timeoutNonce = 100 * 3 // process up to 100 hashes - } - ob.logger.OutTx.Info().Msgf("WatchOutTx: using timeoutNonce %d seconds", timeoutNonce) - ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchOutTx_%d", ob.chain.ChainId), ob.GetChainParams().OutTxTicker) if err != nil { ob.logger.OutTx.Error().Err(err).Msg("error creating ticker") return } + ob.logger.OutTx.Info().Msgf("WatchOutTx started for chain %d", ob.chain.ChainId) + sampledLogger := ob.logger.OutTx.Sample(&zerolog.BasicSampler{N: 10}) defer ticker.Stop() for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsOutboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchOutTx: outbound observation is disabled for chain %d", ob.chain.ChainId) continue } trackers, err := ob.zetaClient.GetAllOutTxTrackerByChain(ob.chain.ChainId, interfaces.Ascending) if err != nil { continue } - //FIXME: remove this timeout here to ensure that all trackers are queried - outTimeout := time.After(time.Duration(timeoutNonce) * time.Second) - TRACKERLOOP: for _, tracker := range trackers { nonceInt := tracker.Nonce if ob.isTxConfirmed(nonceInt) { // Go to next tracker if this one already has a confirmed tx continue } txCount := 0 - var receipt *ethtypes.Receipt - var transaction *ethtypes.Transaction + var outtxReceipt *ethtypes.Receipt + var outtx *ethtypes.Transaction for _, txHash := range tracker.HashList { - select { - case <-outTimeout: - ob.logger.OutTx.Warn().Msgf("WatchOutTx: timeout on chain %d nonce %d", ob.chain.ChainId, nonceInt) - break TRACKERLOOP - default: - if recpt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { - txCount++ - receipt = recpt - transaction = tx - ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) - if txCount > 1 { - ob.logger.OutTx.Error().Msgf( - "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, receipt, transaction) - } + if receipt, tx, ok := ob.checkConfirmedTx(txHash.TxHash, nonceInt); ok { + txCount++ + outtxReceipt = receipt + outtx = tx + ob.logger.OutTx.Info().Msgf("WatchOutTx: confirmed outTx %s for chain %d nonce %d", txHash.TxHash, ob.chain.ChainId, nonceInt) + if txCount > 1 { + ob.logger.OutTx.Error().Msgf( + "WatchOutTx: checkConfirmedTx passed, txCount %d chain %d nonce %d receipt %v transaction %v", txCount, ob.chain.ChainId, nonceInt, outtxReceipt, outtx) } } } if txCount == 1 { // should be only one txHash confirmed for each nonce. - ob.SetTxNReceipt(nonceInt, receipt, transaction) + ob.SetTxNReceipt(nonceInt, outtxReceipt, outtx) } else if txCount > 1 { // should not happen. We can't tell which txHash is true. It might happen (e.g. glitchy/hacked endpoint) ob.logger.OutTx.Error().Msgf("WatchOutTx: confirmed multiple (%d) outTx for chain %d nonce %d", txCount, ob.chain.ChainId, nonceInt) } @@ -854,8 +841,8 @@ func (ob *ChainClient) WatchInTx() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { - sampledLogger.Info().Msgf("WatchInTx: chain %d is not supported", ob.chain.ChainId) + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { + sampledLogger.Info().Msgf("WatchInTx: inbound observation is disabled for chain %d", ob.chain.ChainId) continue } err := ob.observeInTX(sampledLogger) @@ -918,12 +905,6 @@ func (ob *ChainClient) postBlockHeader(tip uint64) error { } func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { - // make sure inbound TXS / Send is enabled by the protocol - flags := ob.coreContext.GetCrossChainFlags() - if !flags.IsInboundEnabled { - return errors.New("inbound TXS / Send has been disabled by the protocol") - } - // get and update latest block height blockNumber, err := ob.evmClient.BlockNumber(context.Background()) if err != nil { @@ -961,7 +942,7 @@ func (ob *ChainClient) observeInTX(sampledLogger zerolog.Logger) error { lastScannedDeposited := ob.ObserveERC20Deposited(startBlock, toBlock) // task 3: query the incoming tx to TSS address (read at most 100 blocks in one go) - lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock, flags) + lastScannedTssRecvd := ob.ObserverTSSReceive(startBlock, toBlock) // note: using lowest height for all 3 events is not perfect, but it's simple and good enough lastScannedLowest := lastScannedZetaSent @@ -1138,11 +1119,13 @@ func (ob *ChainClient) ObserveERC20Deposited(startBlock, toBlock uint64) uint64 // ObserverTSSReceive queries the incoming gas asset to TSS address and posts to zetabridge // returns the last block successfully scanned -func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags observertypes.CrosschainFlags) uint64 { +func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64) uint64 { // query incoming gas asset for bn := startBlock; bn <= toBlock; bn++ { // post new block header (if any) to zetabridge and ignore error // TODO: consider having a independent ticker(from TSS scaning) for posting block headers + // https://github.com/zeta-chain/node/issues/1847 + flags := ob.coreContext.GetCrossChainFlags() if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsEthTypeChainEnabled && chains.IsHeaderSupportedEvmChain(ob.chain.ChainId) { // post block header for supported chains @@ -1165,23 +1148,20 @@ func (ob *ChainClient) ObserverTSSReceive(startBlock, toBlock uint64, flags obse // WatchGasPrice watches evm chain for gas prices and post to zetacore func (ob *ChainClient) WatchGasPrice() { - ob.logger.GasPrice.Info().Msg("WatchGasPrice starting...") + // report gas price right away as the ticker takes time to kick in err := ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } + // start gas price ticker ticker, err := clienttypes.NewDynamicTicker(fmt.Sprintf("EVM_WatchGasPrice_%d", ob.chain.ChainId), ob.GetChainParams().GasPriceTicker) if err != nil { ob.logger.GasPrice.Error().Err(err).Msg("NewDynamicTicker error") return } - ob.logger.GasPrice.Info().Msgf("WatchGasPrice started with interval %d", ob.GetChainParams().GasPriceTicker) + ob.logger.GasPrice.Info().Msgf("WatchGasPrice started for chain %d with interval %d", + ob.chain.ChainId, ob.GetChainParams().GasPriceTicker) defer ticker.Stop() for { @@ -1192,12 +1172,7 @@ func (ob *ChainClient) WatchGasPrice() { } err = ob.PostGasPrice() if err != nil { - height, err := ob.zetaClient.GetBlockHeight() - if err != nil { - ob.logger.GasPrice.Error().Err(err).Msg("GetBlockHeight error") - } else { - ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error at zeta block : %d ", height) - } + ob.logger.GasPrice.Error().Err(err).Msgf("PostGasPrice error for chain %d", ob.chain.ChainId) } ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.GasPrice) case <-ob.stop: diff --git a/zetaclient/evm/inbounds.go b/zetaclient/evm/inbounds.go index d6f6fcb223..819ab09f77 100644 --- a/zetaclient/evm/inbounds.go +++ b/zetaclient/evm/inbounds.go @@ -8,6 +8,7 @@ import ( "strings" sdkmath "cosmossdk.io/math" + ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/onrik/ethrpc" "github.com/pkg/errors" @@ -15,13 +16,12 @@ import ( "github.com/zeta-chain/protocol-contracts/pkg/contracts/evm/zetaconnector.non-eth.sol" "github.com/zeta-chain/zetacore/pkg/chains" "github.com/zeta-chain/zetacore/pkg/coin" + "github.com/zeta-chain/zetacore/pkg/constant" + "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/compliance" "github.com/zeta-chain/zetacore/zetaclient/config" + corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" clienttypes "github.com/zeta-chain/zetacore/zetaclient/types" - - ethcommon "github.com/ethereum/go-ethereum/common" - "github.com/zeta-chain/zetacore/pkg/constant" - "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/zetabridge" "golang.org/x/net/context" ) @@ -43,7 +43,7 @@ func (ob *ChainClient) WatchIntxTracker() { for { select { case <-ticker.C(): - if !ob.GetChainParams().IsSupported { + if !corecontext.IsInboundObservationEnabled(ob.coreContext, ob.GetChainParams()) { continue } err := ob.ObserveIntxTrackers() diff --git a/zetaclient/zetacore_observer.go b/zetaclient/zetacore_observer.go index 34d0774507..415b5d5d21 100644 --- a/zetaclient/zetacore_observer.go +++ b/zetaclient/zetacore_observer.go @@ -5,21 +5,18 @@ import ( "math" "time" + sdkmath "cosmossdk.io/math" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/rs/zerolog" "github.com/zeta-chain/zetacore/pkg/chains" + "github.com/zeta-chain/zetacore/x/crosschain/types" + observertypes "github.com/zeta-chain/zetacore/x/observer/types" appcontext "github.com/zeta-chain/zetacore/zetaclient/app_context" "github.com/zeta-chain/zetacore/zetaclient/bitcoin" corecontext "github.com/zeta-chain/zetacore/zetaclient/core_context" "github.com/zeta-chain/zetacore/zetaclient/interfaces" - "github.com/zeta-chain/zetacore/zetaclient/outtxprocessor" - - observertypes "github.com/zeta-chain/zetacore/x/observer/types" - - sdkmath "cosmossdk.io/math" - - "github.com/rs/zerolog" - "github.com/zeta-chain/zetacore/x/crosschain/types" "github.com/zeta-chain/zetacore/zetaclient/metrics" + "github.com/zeta-chain/zetacore/zetaclient/outtxprocessor" ) const ( @@ -154,8 +151,7 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: getTargetChainOb failed for chain %d", c.ChainId) continue } - if !ob.GetChainParams().IsSupported { - co.logger.ZetaChainWatcher.Info().Msgf("startCctxScheduler: chain %d is not supported", c.ChainId) + if !corecontext.IsOutboundObservationEnabled(coreContext, ob.GetChainParams()) { continue } @@ -178,6 +174,7 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) { continue } } + // update last processed block number lastBlockNum = bn metrics.LastCoreBlockNumber.Set(float64(lastBlockNum))