Skip to content

Commit

Permalink
use multi lvl0 indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoch05 committed Jul 24, 2024
1 parent 0242d98 commit 928402b
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 136 deletions.
10 changes: 7 additions & 3 deletions apollo/src/base/TransferServiceT2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export enum RecordStatus {
failed,
}

export enum Level0Indexer {
export enum Level0IndexerType {
thegraph,
ponder,
}
Expand All @@ -37,9 +37,13 @@ export interface BridgeBaseConfigure {
takeEachTime: number;
}

export interface Level0Indexer {
indexerType: Level0IndexerType;
url: string;
}

export interface PartnerT2 {
level0Indexer: number;
indexerUrl: string;
level0Indexers: Level0Indexer[];
chainConfig: HelixChainConf;
}

Expand Down
199 changes: 105 additions & 94 deletions apollo/src/lnv3/lnv3.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import axios from 'axios';
import { last } from 'lodash';
import { AggregationService } from '../aggregation/aggregation.service';
import { PartnerT2, RecordStatus, Level0Indexer } from '../base/TransferServiceT2';
import { PartnerT2, RecordStatus, Level0IndexerType } from '../base/TransferServiceT2';
import { TasksService } from '../tasks/tasks.service';
import { TransferService } from './transfer.service';
import { ChainToken, ChainMessager, ChainCouple } from '@helixbridge/helixconf';
import { Lnv3ThegraphService } from './source/thegraph.service';
import { Lnv3PonderService } from './source/ponder.service';

export enum RelayUpdateType {
PROVIDER_UPDATE,
Expand All @@ -28,13 +29,17 @@ export class Lnv3Service implements OnModuleInit {

private readonly takeEachTime = 2;
private skip = new Array(this.transferService.transfers.length).fill(0);
private sourceServices = new Map();

constructor(
public configService: ConfigService,
private aggregationService: AggregationService,
private taskService: TasksService,
private transferService: TransferService
) {}
) {
this.sourceServices.set(Level0IndexerType.thegraph, new Lnv3ThegraphService());
this.sourceServices.set(Level0IndexerType.ponder, new Lnv3PonderService());
}

async onModuleInit() {
this.transferService.transfers.forEach((item, index) => {
Expand Down Expand Up @@ -88,83 +93,91 @@ export class Lnv3Service implements OnModuleInit {
}

async queryRecordInfo(transfer: PartnerT2, latestNonce: number) {
if (transfer.level0Indexer === Level0Indexer.ponder) {
const url = this.transferService.ponderEndpoint;
const query = `query { lnv3TransferRecords(limit: 50, orderBy: "nonce", orderDirection: "asc", where: {localChainId: "${transfer.chainConfig.id}", nonce_gt: "${latestNonce}"}) { items { id, nonce, messageNonce, remoteChainId, provider, sourceToken, targetToken, sourceAmount, targetAmount, sender, receiver, timestamp, transactionHash, fee, transferId, hasWithdrawn } }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3TransferRecords.items);
} else {
const url = transfer.indexerUrl;
const query = `query { lnv3TransferRecords(first: 20, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, nonce, messageNonce, remoteChainId, provider, sourceToken, targetToken, sourceAmount, targetAmount, sender, receiver, timestamp, transactionHash, fee, transferId, hasWithdrawn } }`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3TransferRecords);
let result = [];
for (const level0Indexer of transfer.level0Indexers) {
const url = level0Indexer.url;
const service = this.sourceServices.get(level0Indexer.indexerType);
try {
const response = await service.queryRecordInfo(
level0Indexer.url,
transfer.chainConfig.id,
latestNonce
);
if (response && response.length >= result.length) {
result = response;
}
} catch (err) {
this.logger.warn(
`try to get records failed, id ${transfer.chainConfig.id}, type ${level0Indexer.indexerType}`
);
}
}
return result;
}

async queryProviderInfo(transfer: PartnerT2, latestNonce: number) {
if (transfer.level0Indexer === Level0Indexer.ponder) {
const url = this.transferService.ponderEndpoint;
const query = `query { lnv3RelayUpdateRecords(limit: 50, orderBy: "nonce", orderDirection: "asc", where: {localChainId: "${transfer.chainConfig.id}", nonce_gt: "${latestNonce}"}) { items { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, penalty, baseFee, liquidityFeeRate, transferLimit, paused } }}`;

return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayUpdateRecords.items);
} else {
const query = `query { lnv3RelayUpdateRecords(first: 50, orderBy: nonce, orderDirection: asc, skip: ${latestNonce}) { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, penalty, baseFee, liquidityFeeRate, transferLimit, paused } }`;
return await axios
.post(transfer.indexerUrl, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayUpdateRecords);
let result = [];
for (const level0Indexer of transfer.level0Indexers) {
const url = level0Indexer.url;
const service = this.sourceServices.get(level0Indexer.indexerType);
try {
const response = await service.queryProviderInfo(
level0Indexer.url,
transfer.chainConfig.id,
latestNonce
);
if (response && response.length >= result.length) {
result = response;
}
} catch (err) {
this.logger.warn(
`try to get provider infos failed, id ${transfer.chainConfig.id}, type ${level0Indexer.indexerType}`
);
}
}
return result;
}

async queryRecordRelayStatus(toChain: PartnerT2, transferId: string) {
const url =
toChain.level0Indexer === Level0Indexer.ponder
? this.transferService.ponderEndpoint
: toChain.indexerUrl;
const id =
toChain.level0Indexer === Level0Indexer.ponder
? `${toChain.chainConfig.id}-${transferId}`
: transferId;
const query = `query { lnv3RelayRecord(id: "${id}") { id, relayer, timestamp, transactionHash, slashed, requestWithdrawTimestamp, fee }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayRecord);
for (const level0Indexer of toChain.level0Indexers) {
const url = level0Indexer.url;
const service = this.sourceServices.get(level0Indexer.indexerType);
try {
const response = await service.queryRelayStatus(
level0Indexer.url,
toChain.chainConfig.id,
transferId
);
if (response) {
return response;
}
} catch (err) {
this.logger.warn(
`try to get relay status failed, id ${toChain.chainConfig.id}, type ${level0Indexer.indexerType}`
);
}
}
}

async queryRecordWithdrawStatus(transfer: PartnerT2, transferId: string) {
const url =
transfer.level0Indexer === Level0Indexer.ponder
? this.transferService.ponderEndpoint
: transfer.indexerUrl;
const id =
transfer.level0Indexer === Level0Indexer.ponder
? `${transfer.chainConfig.id}-${transferId}`
: transferId;
const query = `query { lnv3TransferRecord(id: "${id}") { id, hasWithdrawn }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3TransferRecord);
for (const level0Indexer of transfer.level0Indexers) {
const url = level0Indexer.url;
const service = this.sourceServices.get(level0Indexer.indexerType);
try {
const response = await service.queryWithdrawStatus(
level0Indexer.url,
transfer.chainConfig.id,
transferId
);
if (response) {
return response;
}
} catch (err) {
this.logger.warn(
`try to get withdraw status failed, id ${transfer.chainConfig.id}, type ${level0Indexer.indexerType}`
);
}
}
}

async fetchRecords(transfer: PartnerT2, index: number) {
Expand Down Expand Up @@ -193,9 +206,6 @@ export class Lnv3Service implements OnModuleInit {
this.fetchCache[index].latestNonce = latestNonce;
continue;
}
if (toChain.indexerUrl === null) {
continue;
}
const fromToken = this.getTokenInfo(transfer, record.sourceToken);
const toToken = this.getTokenInfo(toChain, record.targetToken);

Expand Down Expand Up @@ -256,25 +266,26 @@ export class Lnv3Service implements OnModuleInit {

// batch get status from target chain on sycing historical phase
async queryFillInfos(transfer: PartnerT2, latestTimestamp: number) {
if (transfer.level0Indexer === Level0Indexer.ponder) {
const url = this.transferService.ponderEndpoint;
const query = `query { lnv3RelayRecords(limit: 50, orderBy: "timestamp", orderDirection: "asc", where: {localChainId: "${transfer.chainConfig.id}", slashed: false, timestamp_gt: "${latestTimestamp}"}) { items { id, timestamp, requestWithdrawTimestamp, relayer, transactionHash, slashed, fee } }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayRecords.items);
} else {
const url = transfer.indexerUrl;
const query = `query { lnv3RelayRecords(first: 20, orderBy: timestamp, orderDirection: asc, where: {timestamp_gt: "${latestTimestamp}", slashed: false}) { id, timestamp, requestWithdrawTimestamp, relayer, transactionHash, slashed, fee } }`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayRecords);
let result = [];
for (const level0Indexer of transfer.level0Indexers) {
const url = level0Indexer.url;
const service = this.sourceServices.get(level0Indexer.indexerType);
try {
const response = await service.batchQueryRelayStatus(
level0Indexer.url,
transfer.chainConfig.id,
latestTimestamp
);
if (response && response.length >= result.length) {
result = response;
}
} catch (err) {
this.logger.warn(
`try to batch get fill infos failed, id ${transfer.chainConfig.id}, type ${level0Indexer.indexerType}`
);
}
}
return result;
}

async batchFetchStatus(transfer: PartnerT2, index: number) {
Expand Down Expand Up @@ -427,9 +438,9 @@ export class Lnv3Service implements OnModuleInit {
});

size += 1;
//this.logger.log(
//`lnv3 [${transfer.chain}->${toChain.chain}] new status id: ${record.id} relayed responseTxHash: ${relayRecord.transactionHash}`
//);
this.logger.log(
`lnv3 [${transfer.chainConfig.code}->${toChain.chainConfig.code}] new status id: ${record.id} relayed responseTxHash: ${relayRecord.transactionHash}`
);
}
// query withdrawLiquidity result
if (needWithdrawLiquidity && requestWithdrawTimestamp > 0) {
Expand Down Expand Up @@ -514,7 +525,7 @@ export class Lnv3Service implements OnModuleInit {
const records = await this.queryProviderInfo(transfer, latestNonce);
// maybe this query is archived and can't access
if (records === undefined) {
this.logger.warn(`query record failed, url: ${transfer.indexerUrl}`);
this.logger.warn(`query record failed, id: ${transfer.chainConfig.id}`);
return;
}

Expand Down
76 changes: 76 additions & 0 deletions apollo/src/lnv3/source/ponder.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import axios from 'axios';
import {
Lnv3Record,
Lnv3UpdateRecords,
Lnv3RelayRecord,
Lnv3WithdrawStatus,
SourceService,
} from './source.service';

export class Lnv3PonderService extends SourceService {
async queryRecordInfo(url: string, localId: number, latestNonce: number): Promise<Lnv3Record[]> {
const query = `query { lnv3TransferRecords(limit: 20, orderBy: "nonce", orderDirection: "asc", where: {localChainId: "${localId}", nonce_gt: "${latestNonce}"}) { items { id, nonce, messageNonce, remoteChainId, provider, sourceToken, targetToken, sourceAmount, targetAmount, sender, receiver, timestamp, transactionHash, fee, transferId, hasWithdrawn } }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3TransferRecords.items);
}

async queryProviderInfo(
url: string,
localId: number,
latestNonce: number
): Promise<Lnv3UpdateRecords[]> {
const query = `query { lnv3RelayUpdateRecords(limit: 20, orderBy: "nonce", orderDirection: "asc", where: {localChainId: "${localId}", nonce_gt: "${latestNonce}"}) { items { id, updateType, remoteChainId, provider, transactionHash, timestamp, sourceToken, targetToken, penalty, baseFee, liquidityFeeRate, transferLimit, paused } }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayUpdateRecords.items);
}

async queryRelayStatus(
url: string,
localId: number,
transferId: string
): Promise<Lnv3RelayRecord> {
const id = `${localId}-${transferId}`;
const query = `query { lnv3RelayRecord(id: "${id}") { id, relayer, timestamp, transactionHash, slashed, requestWithdrawTimestamp, fee }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayRecord);
}
async batchQueryRelayStatus(
url: string,
localId: number,
latestTimestamp: number
): Promise<Lnv3RelayRecord[]> {
const query = `query { lnv3RelayRecords(limit: 20, orderBy: "timestamp", orderDirection: "asc", where: {localChainId: "${localId}", slashed: false, timestamp_gt: "${latestTimestamp}"}) { items { id, timestamp, requestWithdrawTimestamp, relayer, transactionHash, slashed, fee } }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3RelayRecords.items);
}
async queryWithdrawStatus(
url: string,
localId: number,
transferId: string
): Promise<Lnv3WithdrawStatus> {
const id = `${localId}-${transferId}`;
const query = `query { lnv3TransferRecord(id: "${id}") { id, hasWithdrawn }}`;
return await axios
.post(url, {
query: query,
variables: null,
})
.then((res) => res.data?.data?.lnv3TransferRecord);
}
}
Loading

0 comments on commit 928402b

Please sign in to comment.