diff --git a/src/lib/monitor/l1.ts b/src/lib/monitor/l1.ts index 4693255..43e1c19 100644 --- a/src/lib/monitor/l1.ts +++ b/src/lib/monitor/l1.ts @@ -10,7 +10,8 @@ import { import { ExecutorDepositTxEntity, ExecutorUnconfirmedTxEntity, - ExecutorOutputEntity + ExecutorOutputEntity, + StateEntity } from '../../orm' import { EntityManager } from 'typeorm' import { RPCClient, RPCSocket } from '../rpc' @@ -21,6 +22,7 @@ import { TxWalletL2, WalletType, getWallet, initWallet } from '../walletL2' export class L1Monitor extends Monitor { executorL2: TxWalletL2 + oracleHeight: number constructor( public socket: RPCSocket, @@ -31,6 +33,8 @@ export class L1Monitor extends Monitor { [this.db] = getDB() initWallet(WalletType.Executor, config.l2lcd) this.executorL2 = getWallet(WalletType.Executor) + + this.oracleHeight = 0 } public name(): string { @@ -58,6 +62,13 @@ export class L1Monitor extends Monitor { } public async prepareMonitor(): Promise { + const state = await this.db.getRepository(StateEntity).findOne({ + where: { + name: 'oracle_height' + } + }) + this.oracleHeight = state?.height || 0 + const bridgeInfoL1 = await config.l1lcd.ophost.bridgeInfo(config.BRIDGE_ID) try { @@ -73,8 +84,9 @@ export class L1Monitor extends Monitor { const errMsg = this.helper.extractErrorMessage(err) if (errMsg.includes('bridge info not found')) { // not found bridge info in l2, set bridge info - await this.setBridgeInfo(bridgeInfoL1, '') + return await this.setBridgeInfo(bridgeInfoL1, '') } + throw err } } @@ -84,7 +96,8 @@ export class L1Monitor extends Monitor { const latestHeight = this.socket.latestHeight const latestTx0 = this.socket.latestTx0 - if (!latestHeight || !latestTx0) return + if (!latestHeight || !latestTx0 || this.oracleHeight == latestHeight) + return const msgs = [ new MsgUpdateOracle( @@ -101,6 +114,11 @@ export class L1Monitor extends Monitor { Succeeded to update oracle tx in height: ${this.currentHeight} ${latestHeight} ${latestTx0} ` ) + + this.oracleHeight = latestHeight + await this.db + .getRepository(StateEntity) + .save({ name: 'oracle_height', height: this.oracleHeight }) } catch (err) { const errMsg = this.helper.extractErrorMessage(err) this.logger.error( diff --git a/src/lib/monitor/monitor.ts b/src/lib/monitor/monitor.ts index 9407f4a..d077e20 100644 --- a/src/lib/monitor/monitor.ts +++ b/src/lib/monitor/monitor.ts @@ -115,9 +115,6 @@ export abstract class Monitor { await this.handleBlockWithStateUpdate(manager) } }) - } catch (err) { - this.logger.error(err) - this.stop() } finally { await Bluebird.delay(INTERVAL_MONITOR) } diff --git a/src/lib/rpc.ts b/src/lib/rpc.ts index b804196..860e574 100644 --- a/src/lib/rpc.ts +++ b/src/lib/rpc.ts @@ -2,6 +2,8 @@ import * as winston from 'winston' import axios, { AxiosRequestConfig } from 'axios' import Websocket from 'ws' +const MAX_RETRY = 10 + export class RPCSocket { public ws: Websocket public wsUrl: string @@ -14,6 +16,7 @@ export class RPCSocket { logger: winston.Logger rpcUrl: string curRPCUrlIndex: number + retry = 0 constructor( public rpcUrls: string[], @@ -72,6 +75,7 @@ export class RPCSocket { const msg = `${this.constructor.name} is now alive. (downtime ${downtime} minutes)` this.logger.info(msg) this.isAlive = true + this.retry = 0 } this.alivedAt = Date.now() } @@ -121,9 +125,13 @@ export class RPCSocket { protected onDisconnect(code: number, reason: string): void { this.rotateRPC() + this.retry++ this.logger.info( `${this.constructor.name}: websocket disconnected (${code}: ${reason})` ) + if (this.retry > MAX_RETRY) { + throw new Error(`RPCSocket max retry reached ${this.rpcUrl}`) + } // if disconnected, try connect again setTimeout(() => this.connect(), 1000) } diff --git a/src/worker/bridgeExecutor/index.ts b/src/worker/bridgeExecutor/index.ts index e0b317d..021e8a2 100644 --- a/src/worker/bridgeExecutor/index.ts +++ b/src/worker/bridgeExecutor/index.ts @@ -38,12 +38,16 @@ async function runBot(): Promise { } } -function stopBot(): void { - monitors.forEach((monitor) => monitor.stop()) +async function stopBot(): Promise { + await Promise.all( + monitors.map((monitor) => { + monitor.stop() + }) + ) } export async function stopExecutor(): Promise { - stopBot() + await stopBot() logger.info('Closing listening port') finalizeServer()