diff --git a/apollo/src/aggregation/aggregation.service.ts b/apollo/src/aggregation/aggregation.service.ts index ced3cf0..34b16bb 100644 --- a/apollo/src/aggregation/aggregation.service.ts +++ b/apollo/src/aggregation/aggregation.service.ts @@ -157,7 +157,7 @@ export class AggregationService extends PrismaClient implements OnModuleInit { skip, take, where, - orderBy, + orderBy: orderBy ?? { startTime: Prisma.SortOrder.asc }, }); const total = await this.historyRecord.count({ where }); diff --git a/apollo/src/lnv2/lnv2.service.ts b/apollo/src/lnv2/lnv2.service.ts index 828089f..58a670c 100644 --- a/apollo/src/lnv2/lnv2.service.ts +++ b/apollo/src/lnv2/lnv2.service.ts @@ -22,6 +22,7 @@ export interface FetchCacheRelayInfo extends FetchCacheInfo { confirmedNonce: string; latestRelayerInfoNonce: number; latestRelayerInfoTargetNonce: number; + latestFillInfoTimestamp: number; } export interface BridgeIndexInfo { @@ -48,7 +49,7 @@ export class Lnv2Service implements OnModuleInit { name: 'lnBridgeV2', fetchHistoryDataFirst: 10, fetchSendDataInterval: 10000, - takeEachTime: 5, + takeEachTime: 2, }; // default cache, opposite cache @@ -59,6 +60,7 @@ export class Lnv2Service implements OnModuleInit { confirmedNonce: '0', latestRelayerInfoNonce: -1, latestRelayerInfoTargetNonce: -1, + latestFillInfoTimestamp: -1, isSyncingHistory: false, skip: 0, })); @@ -122,6 +124,7 @@ export class Lnv2Service implements OnModuleInit { await this.repairReorg(item, indexInfo); // from source chain await this.fetchRecords(item, indexInfo); + await this.batchFetchStatus(item, indexInfo); // from target chain await this.fetchStatus(item, indexInfo); this.fetchCache[index].isSyncingHistory = false; @@ -269,7 +272,7 @@ export class Lnv2Service implements OnModuleInit { ); latestNonce = firstRecord ? Number(firstRecord.nonce) : 0; } - const query = `query { lnv2TransferRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, remoteChainId, nonce, provider, sender, receiver, sourceToken, targetToken, amount, transactionHash, timestamp, fee } }`; + const query = `query { lnv2TransferRecords(first: 30, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, remoteChainId, nonce, provider, sender, receiver, sourceToken, targetToken, amount, transactionHash, timestamp, fee } }`; const records = await axios .post(indexInfo.url, { @@ -357,6 +360,84 @@ export class Lnv2Service implements OnModuleInit { } } + // batch get status from target chain on sycing historical phase + async queryFillInfos(indexInfo: BridgeIndexInfo, latestTimestamp: number) { + const url = indexInfo.url; + const query = `query { lnv2RelayRecords(first: 30, orderBy: timestamp, orderDirection: asc, where: {timestamp_gt: "${latestTimestamp}", slasher: null}) { id, timestamp, transactionHash, fee } }`; + return await axios + .post(url, { + query: query, + variables: null, + }).then((res) => res.data?.data?.lnv2RelayRecords); + } + + async batchFetchStatus(transfer: PartnerT3, indexInfo: BridgeIndexInfo) { + try { + let latestTimestamp = this.fetchCache[indexInfo.index].latestFillInfoTimestamp; + // stop sync history when timestamp set to zero + if (latestTimestamp === 0) { + return; + } else if (latestTimestamp === -1) { + const firstRecord = await this.aggregationService.queryHistoryRecordFirst( + { + toChain: transfer.chainName, + bridge: this.bridgeName(indexInfo), + result: RecordStatus.success, + }, + { nonce: 'desc' } + ); + latestTimestamp = firstRecord ? firstRecord.endTime : -1; + } + const relayRecords = await this.queryFillInfos(indexInfo, latestTimestamp); + if (relayRecords.length === 0) { + this.fetchCache[indexInfo.index].latestFillInfoTimestamp = 0; + this.logger.log(`lnv2 the batch sync end, chain: ${transfer.chainName}, lastTime: ${latestTimestamp}`); + return; + } + let batchAddCount = 0; + for (const relayRecord of relayRecords) { + // ignore slashed transfer + let rmvedTransferId = relayRecord.id; + if (rmvedTransferId.startsWith(`${transfer.chainId}-`)) { + rmvedTransferId = rmvedTransferId.replace(`${transfer.chainId}-`, ''); + } + const uncheckedRecord = await this.aggregationService + .queryHistoryRecordFirst({ + id: { + endsWith: rmvedTransferId, + } + }); + // the record exist but not finished + if (uncheckedRecord?.endTxHash === '' ) { + const updateData = { + result: RecordStatus.success, + responseTxHash: relayRecord.transactionHash, + endTxHash: relayRecord.transactionHash, + endTime: Number(relayRecord.timestamp), + }; + + await this.aggregationService.updateHistoryRecord({ + where: { id: uncheckedRecord.id }, + data: updateData, + }); + this.fetchCache[indexInfo.index].latestFillInfoTimestamp = updateData.endTime; + batchAddCount += 1; + } else if (uncheckedRecord) { + this.fetchCache[indexInfo.index].latestFillInfoTimestamp = Number(relayRecord.timestamp); + } + } + if (batchAddCount > 0) { + this.logger.log( + `lnv2 [${transfer.chainName}] batch fetch status, count: ${batchAddCount}` + ); + } + } catch(error) { + this.logger.warn(`batch fetch lnv2 status failed, error ${error}`); + } + } + + + // fetch status from target chain and source chain(slash result) // 1. relayed, finished // 2. cancel inited, save timestamp to check if users can cancel tx or ln can relay msg @@ -372,7 +453,7 @@ export class Lnv2Service implements OnModuleInit { fromChain: transfer.chainName, bridge: this.bridgeName(indexInfo), endTxHash: '', - }, + } }) .then((result) => result.records); @@ -529,7 +610,7 @@ export class Lnv2Service implements OnModuleInit { ); latestNonce = firstRecord ? Number(firstRecord.targetNonce) : 0; } - const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, where: {updateType_in: [${RelayUpdateType.SLASH}, ${RelayUpdateType.WITHDRAW}]}, skip: ${latestNonce}) { id, remoteChainId, provider, margin, updateType, withdrawNonce, transactionHash, timestamp, sourceToken, targetToken } }`; + const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, where: {updateType_in: [${RelayUpdateType.SLASH}, ${RelayUpdateType.WITHDRAW}]}, skip: ${latestNonce}) { id, remoteChainId, provider, margin, updateType, withdrawNonce, transactionHash, timestamp, sourceToken, targetToken } }`; const records = await axios .post(indexInfo.url, { query: query, @@ -652,7 +733,7 @@ export class Lnv2Service implements OnModuleInit { ); latestNonce = firstRecord ? Number(firstRecord.nonce) : 0; } - const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, where: {updateType: ${RelayUpdateType.PROVIDER_UPDATE}}, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, baseFee, liquidityFeeRate } }`; + const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, where: {updateType: ${RelayUpdateType.PROVIDER_UPDATE}}, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, baseFee, liquidityFeeRate } }`; const records = await axios .post(indexInfo.url, { @@ -668,6 +749,7 @@ export class Lnv2Service implements OnModuleInit { this.logger.warn(`query record failed, url: ${indexInfo.url}, query: ${query}`); return; } + let size = 0; // query nonce big then latestNonce for (const record of records) { // query by relayer @@ -740,9 +822,12 @@ export class Lnv2Service implements OnModuleInit { } latestNonce += 1; this.fetchCache[index].latestRelayerInfoNonce = latestNonce; - this.logger.log( - `update lnv2 relay info, id ${id}, type ${record.updateType}, basefee ${record.baseFee}, liquidityFee ${record.liquidityFeeRate}` - ); + size += 1; + } + if (size > 0) { + this.logger.log( + `update lnv2 relay info from source, size: ${size}` + ); } } catch (error) { this.logger.warn(`fetchFeeInfoFromSource failed, error ${error}`); @@ -764,7 +849,7 @@ export class Lnv2Service implements OnModuleInit { ); latestNonce = firstRecord ? Number(firstRecord.nonce) : 0; } - const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, margin, baseFee, liquidityFeeRate } }`; + const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, margin, baseFee, liquidityFeeRate } }`; const records = await axios .post(indexInfo.url, { @@ -780,6 +865,7 @@ export class Lnv2Service implements OnModuleInit { this.logger.warn(`query record failed, url: ${indexInfo.url}, query: ${query}`); return; } + let size = 0; // query nonce big then latestNonce for (const record of records) { // query by relayer @@ -862,9 +948,12 @@ export class Lnv2Service implements OnModuleInit { } latestNonce += 1; this.fetchCache[index].latestRelayerInfoNonce = latestNonce; - this.logger.log( - `update lnv2 relay info, id ${id}, type ${record.updateType}, margin ${record.margin}, basefee ${record.baseFee}, liquidityFee ${record.liquidityFeeRate}` - ); + size += 1; + } + if (size > 0) { + this.logger.log( + `update lnv2 relay info for opposite, size ${size}` + ); } } catch (error) { this.logger.warn(`fetchRelayInfo failed, error ${error}`); diff --git a/apollo/src/lnv3/lnv3.service.ts b/apollo/src/lnv3/lnv3.service.ts index b9c86b0..5beb472 100644 --- a/apollo/src/lnv3/lnv3.service.ts +++ b/apollo/src/lnv3/lnv3.service.ts @@ -18,11 +18,11 @@ export class Lnv3Service implements OnModuleInit { private fetchCache = new Array(this.transferService.transfers.length) .fill('') - .map((_) => ({ latestNonce: -1, latestRelayerInfoNonce: -1, isSyncingHistory: false })); + .map((_) => ({ latestNonce: -1, latestRelayerInfoNonce: -1, latestFillInfoTimestamp: -1, isSyncingHistory: false })); protected fetchSendDataInterval = 5000; - private readonly takeEachTime = 6; + private readonly takeEachTime = 2; private skip = new Array(this.transferService.transfers.length).fill(0); constructor( @@ -44,6 +44,7 @@ export class Lnv3Service implements OnModuleInit { this.fetchCache[index].isSyncingHistory = true; await this.fetchProviderInfo(item, index); await this.fetchRecords(item, index); + await this.batchFetchStatus(item, index); await this.fetchStatus(item, index); this.fetchCache[index].isSyncingHistory = false; } @@ -93,7 +94,7 @@ export class Lnv3Service implements OnModuleInit { }).then((res) => res.data?.data?.lnv3TransferRecords.items); } else { const url = transfer.url; - const query = `query { lnv3TransferRecords(first: 10, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, nonce, messageNonce, remoteChainId, provider, sourceToken, targetToken, sourceAmount, targetAmount, sender, receiver, timestamp, transactionHash, fee, transferId, hasWithdrawn } }`; + const query = `query { lnv3TransferRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, nonce, messageNonce, remoteChainId, provider, sourceToken, targetToken, sourceAmount, targetAmount, sender, receiver, timestamp, transactionHash, fee, transferId, hasWithdrawn } }`; return await axios .post(url, { query: query, @@ -112,7 +113,7 @@ export class Lnv3Service implements OnModuleInit { variables: null, }).then((res) => res.data?.data?.lnv3RelayUpdateRecords.items); } else { - const query = `query { lnv3RelayUpdateRecords(first: 10, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, penalty, baseFee, liquidityFeeRate, transferLimit, paused } }`; + const query = `query { lnv3RelayUpdateRecords(first: 50, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, penalty, baseFee, liquidityFeeRate, transferLimit, paused } }`; return await axios.post(transfer.url, { query: query, variables: null, @@ -161,6 +162,7 @@ export class Lnv3Service implements OnModuleInit { const records = await this.queryRecordInfo(transfer, latestNonce); if (records && records.length > 0) { + let size = 0; for (const record of records) { const toChain = this.getDestChain(record.remoteChainId.toString()); if (toChain === null) { @@ -169,6 +171,9 @@ export class Lnv3Service implements OnModuleInit { this.fetchCache[index].latestNonce = latestNonce; continue; } + if (toChain.url === null) { + continue; + } const fromToken = this.getTokenInfo(transfer, record.sourceToken.toLowerCase()); const toToken = this.getTokenInfo(toChain, record.targetToken.toLowerCase()); @@ -205,8 +210,11 @@ export class Lnv3Service implements OnModuleInit { lastRequestWithdraw: 0, }); latestNonce += 1; + size += 1; + } + if (size > 0) { this.logger.log( - `lnv3 [${transfer.chain}->${toChain.chain}] save new send record succeeded nonce: ${latestNonce}, id: ${record.id}` + `lnv3 [${transfer.chain}] save new send records succeeded nonce: ${latestNonce}, size: ${size}` ); } @@ -219,6 +227,95 @@ export class Lnv3Service implements OnModuleInit { } } + // batch get status from target chain on sycing historical phase + async queryFillInfos(transfer: PartnerT2, latestTimestamp: number) { + if (transfer.level0Indexer === Level0Indexer.ponder) { + const url = this.transferService.ponderEndpoint; + const query = `query { lnv3RelayRecords(limit: 50, orderBy: "timestamp", orderDirection: "asc", where: {localChainId: "${transfer.chainId}", slashed: false, timestamp_gt: "${latestTimestamp}"}) { items { id, timestamp, requestWithdrawTimestamp, relayer, transactionHash, slashed, fee } }}`; + return await axios + .post(url, { + query: query, + variables: null, + }).then((res) => res.data?.data?.lnv3RelayRecords.items); + } else { + const url = transfer.url; + const query = `query { lnv3RelayRecords(first: 20, orderBy: timestamp, orderDirection: asc, where: {timestamp_gt: "${latestTimestamp}", slashed: false}) { id, timestamp, requestWithdrawTimestamp, relayer, transactionHash, slashed, fee } }`; + return await axios + .post(url, { + query: query, + variables: null, + }).then((res) => res.data?.data?.lnv3RelayRecords); + } + } + + async batchFetchStatus(transfer: PartnerT2, index: number) { + try { + let latestTimestamp = this.fetchCache[index].latestFillInfoTimestamp; + // stop sync history when timestamp set to zero + if (latestTimestamp === 0) { + return; + } else if (latestTimestamp === -1) { + const firstRecord = await this.aggregationService.queryHistoryRecordFirst( + { + toChain: transfer.chain, + bridge: `lnv3`, + }, + { nonce: 'desc' } + ); + latestTimestamp = firstRecord ? firstRecord.endTime : -1; + } + const relayRecords = await this.queryFillInfos(transfer, latestTimestamp); + if (relayRecords.length === 0) { + this.fetchCache[index].latestFillInfoTimestamp = 0; + this.logger.log(`the batch sync end, chain: ${transfer.chain}`); + return; + } + let size = 0; + for (const relayRecord of relayRecords) { + // ignore slashed transfer + if (relayRecord.slashed) continue; + let rmvedTransferId = relayRecord.id; + if (rmvedTransferId.startsWith(`${transfer.chainId}-`)) { + rmvedTransferId = rmvedTransferId.replace(`${transfer.chainId}-`, ''); + } + const uncheckedRecord = await this.aggregationService + .queryHistoryRecordFirst({ + id: { + endsWith: rmvedTransferId, + } + }); + // the record exist but not finished + if (uncheckedRecord?.endTxHash === '' ) { + const updateData = { + result: RecordStatus.success, + responseTxHash: relayRecord.transactionHash, + endTxHash: relayRecord.transactionHash, + endTime: Number(relayRecord.timestamp), + relayer: relayRecord.relayer.toLowerCase(), + needWithdrawLiquidity: false, + lastRequestWithdraw: Number(relayRecord.requestWithdrawTimestamp), + }; + + await this.aggregationService.updateHistoryRecord({ + where: { id: uncheckedRecord.id }, + data: updateData, + }); + this.fetchCache[index].latestFillInfoTimestamp = updateData.endTime; + size += 1; + } else if (uncheckedRecord) { + this.fetchCache[index].latestFillInfoTimestamp = Number(relayRecord.timestamp); + } + } + if (size > 0) { + this.logger.log( + `lnv3 [${transfer.chain}] batch fetch status, size: ${size}` + ); + } + } catch(error) { + this.logger.warn(`batch fetch lnv3 status failed, error ${error}`); + } + } + async fetchStatus(transfer: PartnerT2, index: number) { try { const uncheckedRecords = await this.aggregationService @@ -239,6 +336,7 @@ export class Lnv3Service implements OnModuleInit { this.skip[index] += this.takeEachTime; } + let size = 0; for (const record of uncheckedRecords) { const recordSplitted = record.id.split('-'); const transferId = last(recordSplitted); @@ -302,9 +400,10 @@ export class Lnv3Service implements OnModuleInit { }, }); - this.logger.log( - `lnv3 [${transfer.chain}->${toChain.chain}] new status id: ${record.id} relayed responseTxHash: ${relayRecord.transactionHash}` - ); + size += 1; + //this.logger.log( + //`lnv3 [${transfer.chain}->${toChain.chain}] new status id: ${record.id} relayed responseTxHash: ${relayRecord.transactionHash}` + //); } // query withdrawLiquidity result if (needWithdrawLiquidity && requestWithdrawTimestamp > 0) { @@ -331,6 +430,12 @@ export class Lnv3Service implements OnModuleInit { } } } + + if (size > 0) { + this.logger.log( + `lnv3 [${transfer.chain}] update record status, size: ${size}` + ); + } } catch (error) { this.logger.warn(`fetch lnv3 status failed, error ${error}`); } @@ -370,6 +475,7 @@ export class Lnv3Service implements OnModuleInit { return; } + let size = 0; // query nonce big then latestNonce for (const record of records) { // query by relayer @@ -428,6 +534,7 @@ export class Lnv3Service implements OnModuleInit { paused: record.paused ?? false, messageChannel: channel.channel, }); + size += 1; } else { // else update const updateData = { @@ -453,9 +560,12 @@ export class Lnv3Service implements OnModuleInit { }); } latestNonce += 1; + size += 1; this.fetchCache[index].latestRelayerInfoNonce = latestNonce; + } + if (size > 0) { this.logger.log( - `lnv3 [${transfer.chain}->${toChain.chain}] update relayer info, id ${id}, type ${record.updateType}, margin ${record.penalty}, basefee ${record.baseFee}, liquidityFee ${record.liquidityFeeRate}` + `lnv3 [${transfer.chain}] update relayer info, size: ${size}` ); } } catch (error) {