Skip to content

Commit

Permalink
batch fetch transfer status
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoch05 committed May 10, 2024
1 parent eb72c34 commit da53aa9
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 22 deletions.
2 changes: 1 addition & 1 deletion apollo/src/aggregation/aggregation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class AggregationService extends PrismaClient implements OnModuleInit {
skip,
take,
where,
orderBy,
orderBy: orderBy ?? { startTime: Prisma.SortOrder.asc },
});
const total = await this.historyRecord.count({ where });

Expand Down
113 changes: 101 additions & 12 deletions apollo/src/lnv2/lnv2.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export interface FetchCacheRelayInfo extends FetchCacheInfo {
confirmedNonce: string;
latestRelayerInfoNonce: number;
latestRelayerInfoTargetNonce: number;
latestFillInfoTimestamp: number;
}

export interface BridgeIndexInfo {
Expand All @@ -48,7 +49,7 @@ export class Lnv2Service implements OnModuleInit {
name: 'lnBridgeV2',
fetchHistoryDataFirst: 10,
fetchSendDataInterval: 10000,
takeEachTime: 5,
takeEachTime: 2,
};

// default cache, opposite cache
Expand All @@ -59,6 +60,7 @@ export class Lnv2Service implements OnModuleInit {
confirmedNonce: '0',
latestRelayerInfoNonce: -1,
latestRelayerInfoTargetNonce: -1,
latestFillInfoTimestamp: -1,
isSyncingHistory: false,
skip: 0,
}));
Expand Down Expand Up @@ -122,6 +124,7 @@ export class Lnv2Service implements OnModuleInit {
await this.repairReorg(item, indexInfo);
// from source chain
await this.fetchRecords(item, indexInfo);
await this.batchFetchStatus(item, indexInfo);
// from target chain
await this.fetchStatus(item, indexInfo);
this.fetchCache[index].isSyncingHistory = false;
Expand Down Expand Up @@ -269,7 +272,7 @@ export class Lnv2Service implements OnModuleInit {
);
latestNonce = firstRecord ? Number(firstRecord.nonce) : 0;
}
const query = `query { lnv2TransferRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, remoteChainId, nonce, provider, sender, receiver, sourceToken, targetToken, amount, transactionHash, timestamp, fee } }`;
const query = `query { lnv2TransferRecords(first: 30, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, remoteChainId, nonce, provider, sender, receiver, sourceToken, targetToken, amount, transactionHash, timestamp, fee } }`;

const records = await axios
.post(indexInfo.url, {
Expand Down Expand Up @@ -357,6 +360,84 @@ export class Lnv2Service implements OnModuleInit {
}
}

// batch get status from target chain on sycing historical phase
async queryFillInfos(indexInfo: BridgeIndexInfo, latestTimestamp: number) {
const url = indexInfo.url;
const query = `query { lnv2RelayRecords(first: 30, orderBy: timestamp, orderDirection: asc, where: {timestamp_gt: "${latestTimestamp}", slasher: null}) { id, timestamp, transactionHash, fee } }`;
return await axios
.post(url, {
query: query,
variables: null,
}).then((res) => res.data?.data?.lnv2RelayRecords);
}

async batchFetchStatus(transfer: PartnerT3, indexInfo: BridgeIndexInfo) {
try {
let latestTimestamp = this.fetchCache[indexInfo.index].latestFillInfoTimestamp;
// stop sync history when timestamp set to zero
if (latestTimestamp === 0) {
return;
} else if (latestTimestamp === -1) {
const firstRecord = await this.aggregationService.queryHistoryRecordFirst(
{
toChain: transfer.chainName,
bridge: this.bridgeName(indexInfo),
result: RecordStatus.success,
},
{ nonce: 'desc' }
);
latestTimestamp = firstRecord ? firstRecord.endTime : -1;
}
const relayRecords = await this.queryFillInfos(indexInfo, latestTimestamp);
if (relayRecords.length === 0) {
this.fetchCache[indexInfo.index].latestFillInfoTimestamp = 0;
this.logger.log(`lnv2 the batch sync end, chain: ${transfer.chainName}, lastTime: ${latestTimestamp}`);
return;
}
let batchAddCount = 0;
for (const relayRecord of relayRecords) {
// ignore slashed transfer
let rmvedTransferId = relayRecord.id;
if (rmvedTransferId.startsWith(`${transfer.chainId}-`)) {
rmvedTransferId = rmvedTransferId.replace(`${transfer.chainId}-`, '');
}
const uncheckedRecord = await this.aggregationService
.queryHistoryRecordFirst({
id: {
endsWith: rmvedTransferId,
}
});
// the record exist but not finished
if (uncheckedRecord?.endTxHash === '' ) {
const updateData = {
result: RecordStatus.success,
responseTxHash: relayRecord.transactionHash,
endTxHash: relayRecord.transactionHash,
endTime: Number(relayRecord.timestamp),
};

await this.aggregationService.updateHistoryRecord({
where: { id: uncheckedRecord.id },
data: updateData,
});
this.fetchCache[indexInfo.index].latestFillInfoTimestamp = updateData.endTime;
batchAddCount += 1;
} else if (uncheckedRecord) {
this.fetchCache[indexInfo.index].latestFillInfoTimestamp = Number(relayRecord.timestamp);
}
}
if (batchAddCount > 0) {
this.logger.log(
`lnv2 [${transfer.chainName}] batch fetch status, count: ${batchAddCount}`
);
}
} catch(error) {
this.logger.warn(`batch fetch lnv2 status failed, error ${error}`);
}
}



// fetch status from target chain and source chain(slash result)
// 1. relayed, finished
// 2. cancel inited, save timestamp to check if users can cancel tx or ln can relay msg
Expand All @@ -372,7 +453,7 @@ export class Lnv2Service implements OnModuleInit {
fromChain: transfer.chainName,
bridge: this.bridgeName(indexInfo),
endTxHash: '',
},
}
})
.then((result) => result.records);

Expand Down Expand Up @@ -529,7 +610,7 @@ export class Lnv2Service implements OnModuleInit {
);
latestNonce = firstRecord ? Number(firstRecord.targetNonce) : 0;
}
const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, where: {updateType_in: [${RelayUpdateType.SLASH}, ${RelayUpdateType.WITHDRAW}]}, skip: ${latestNonce}) { id, remoteChainId, provider, margin, updateType, withdrawNonce, transactionHash, timestamp, sourceToken, targetToken } }`;
const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, where: {updateType_in: [${RelayUpdateType.SLASH}, ${RelayUpdateType.WITHDRAW}]}, skip: ${latestNonce}) { id, remoteChainId, provider, margin, updateType, withdrawNonce, transactionHash, timestamp, sourceToken, targetToken } }`;
const records = await axios
.post(indexInfo.url, {
query: query,
Expand Down Expand Up @@ -652,7 +733,7 @@ export class Lnv2Service implements OnModuleInit {
);
latestNonce = firstRecord ? Number(firstRecord.nonce) : 0;
}
const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, where: {updateType: ${RelayUpdateType.PROVIDER_UPDATE}}, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, baseFee, liquidityFeeRate } }`;
const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, where: {updateType: ${RelayUpdateType.PROVIDER_UPDATE}}, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, baseFee, liquidityFeeRate } }`;

const records = await axios
.post(indexInfo.url, {
Expand All @@ -668,6 +749,7 @@ export class Lnv2Service implements OnModuleInit {
this.logger.warn(`query record failed, url: ${indexInfo.url}, query: ${query}`);
return;
}
let size = 0;
// query nonce big then latestNonce
for (const record of records) {
// query by relayer
Expand Down Expand Up @@ -740,9 +822,12 @@ export class Lnv2Service implements OnModuleInit {
}
latestNonce += 1;
this.fetchCache[index].latestRelayerInfoNonce = latestNonce;
this.logger.log(
`update lnv2 relay info, id ${id}, type ${record.updateType}, basefee ${record.baseFee}, liquidityFee ${record.liquidityFeeRate}`
);
size += 1;
}
if (size > 0) {
this.logger.log(
`update lnv2 relay info from source, size: ${size}`
);
}
} catch (error) {
this.logger.warn(`fetchFeeInfoFromSource failed, error ${error}`);
Expand All @@ -764,7 +849,7 @@ export class Lnv2Service implements OnModuleInit {
);
latestNonce = firstRecord ? Number(firstRecord.nonce) : 0;
}
const query = `query { lnv2RelayUpdateRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, margin, baseFee, liquidityFeeRate } }`;
const query = `query { lnv2RelayUpdateRecords(first: 30, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, margin, baseFee, liquidityFeeRate } }`;

const records = await axios
.post(indexInfo.url, {
Expand All @@ -780,6 +865,7 @@ export class Lnv2Service implements OnModuleInit {
this.logger.warn(`query record failed, url: ${indexInfo.url}, query: ${query}`);
return;
}
let size = 0;
// query nonce big then latestNonce
for (const record of records) {
// query by relayer
Expand Down Expand Up @@ -862,9 +948,12 @@ export class Lnv2Service implements OnModuleInit {
}
latestNonce += 1;
this.fetchCache[index].latestRelayerInfoNonce = latestNonce;
this.logger.log(
`update lnv2 relay info, id ${id}, type ${record.updateType}, margin ${record.margin}, basefee ${record.baseFee}, liquidityFee ${record.liquidityFeeRate}`
);
size += 1;
}
if (size > 0) {
this.logger.log(
`update lnv2 relay info for opposite, size ${size}`
);
}
} catch (error) {
this.logger.warn(`fetchRelayInfo failed, error ${error}`);
Expand Down
Loading

0 comments on commit da53aa9

Please sign in to comment.