diff --git a/src/modules/analytics-indexer/analytics.indexer.module.ts b/src/modules/analytics-indexer/analytics.indexer.module.ts new file mode 100644 index 000000000..10bf1d0fc --- /dev/null +++ b/src/modules/analytics-indexer/analytics.indexer.module.ts @@ -0,0 +1,38 @@ +import { Module } from '@nestjs/common'; +import { PairModule } from '../pair/pair.module'; +import { RouterModule } from '../router/router.module'; +import { PriceDiscoveryModule } from '../price-discovery/price.discovery.module'; +import { TokenModule } from '../tokens/token.module'; +import { ElasticSearchModule } from 'src/services/elastic-search/elastic.search.module'; +import { IndexerService } from './services/indexer.service'; +import { IndexerStateService } from './services/indexer.state.service'; +import { IndexerPairService } from './services/indexer.pair.service'; +import { IndexerRouterService } from './services/indexer.router.service'; +import { IndexerTokenService } from './services/indexer.token.service'; +import { IndexerPriceDiscoveryService } from './services/indexer.price.discovery.service'; +import { IndexerSwapHandlerService } from './services/event-handlers/indexer.swap.handler.service'; +import { IndexerLiquidityHandlerService } from './services/event-handlers/indexer.liquidity.handler.service'; +import { IndexerPriceDiscoveryHandlerService } from './services/event-handlers/indexer.price.discovery.handler.service'; + +@Module({ + imports: [ + PairModule, + RouterModule, + TokenModule, + PriceDiscoveryModule, + ElasticSearchModule, + ], + providers: [ + IndexerService, + IndexerStateService, + IndexerPairService, + IndexerRouterService, + IndexerTokenService, + IndexerPriceDiscoveryService, + IndexerSwapHandlerService, + IndexerLiquidityHandlerService, + IndexerPriceDiscoveryHandlerService, + ], + exports: [IndexerService], +}) +export class AnalyticsIndexerModule {} diff --git a/src/modules/analytics-indexer/entities/indexer.event.types.ts b/src/modules/analytics-indexer/entities/indexer.event.types.ts new file mode 100644 index 000000000..f72d91c42 --- /dev/null +++ b/src/modules/analytics-indexer/entities/indexer.event.types.ts @@ -0,0 +1,14 @@ +export enum IndexerEventIdentifiers { + SWAP_FIXED_INPUT = 'swapTokensFixedInput', + SWAP_FIXED_OUTPUT = 'swapTokensFixedOutput', + ADD_LIQUIDITY = 'addLiquidity', + REMOVE_LIQUIDITY = 'removeLiquidity', + PRICE_DISCOVERY_DEPOSIT = 'deposit', + PRICE_DISCOVERY_WITHDRAW = 'withdraw', +} + +export enum IndexerEventTypes { + SWAP_EVENTS = 'SWAP_EVENTS', + LIQUIDITY_EVENTS = 'LIQUIDITY_EVENTS', + PRICE_DISCOVERY_EVENTS = 'PRICE_DISCOVERY_EVENTS', +} diff --git a/src/modules/analytics-indexer/entities/pair.metadata.ts b/src/modules/analytics-indexer/entities/pair.metadata.ts new file mode 100644 index 000000000..6c479bfa5 --- /dev/null +++ b/src/modules/analytics-indexer/entities/pair.metadata.ts @@ -0,0 +1,12 @@ +import { EsdtToken } from 'src/modules/tokens/models/esdtToken.model'; + +export class PairMetadata { + address: string; + firstToken: EsdtToken; + secondToken: EsdtToken; + totalFeePercent: number; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} diff --git a/src/modules/analytics-indexer/entities/price.discovery.metadata.ts b/src/modules/analytics-indexer/entities/price.discovery.metadata.ts new file mode 100644 index 000000000..eea2cb68d --- /dev/null +++ b/src/modules/analytics-indexer/entities/price.discovery.metadata.ts @@ -0,0 +1,11 @@ +import { EsdtToken } from 'src/modules/tokens/models/esdtToken.model'; + +export class PriceDiscoveryMetadata { + address: string; + launchedToken: EsdtToken; + acceptedToken: EsdtToken; + + constructor(init?: Partial) { + Object.assign(this, init); + } +} diff --git a/src/modules/analytics-indexer/global.state.ts b/src/modules/analytics-indexer/global.state.ts new file mode 100644 index 000000000..c97c8df53 --- /dev/null +++ b/src/modules/analytics-indexer/global.state.ts @@ -0,0 +1,13 @@ +export class PairState { + firstTokenID: string; + secondTokenID: string; + firstTokenReserves: string; + secondTokenReserves: string; + liquidityPoolSupply: string; +} + +export class GlobalStateSingleton { + public pairsState: { [key: string]: PairState } = {}; +} + +export const GlobalState = new GlobalStateSingleton(); diff --git a/src/modules/analytics-indexer/services/event-handlers/indexer.liquidity.handler.service.ts b/src/modules/analytics-indexer/services/event-handlers/indexer.liquidity.handler.service.ts new file mode 100644 index 000000000..86ecb9f94 --- /dev/null +++ b/src/modules/analytics-indexer/services/event-handlers/indexer.liquidity.handler.service.ts @@ -0,0 +1,110 @@ +import { Injectable } from '@nestjs/common'; +import { + AddLiquidityEvent, + RemoveLiquidityEvent, +} from '@multiversx/sdk-exchange'; +import { computeValueUSD } from 'src/utils/token.converters'; +import { GlobalState } from '../../global.state'; +import { IndexerStateService } from '../indexer.state.service'; +import { IndexerPairService } from '../indexer.pair.service'; +import { IndexerRouterService } from '../indexer.router.service'; +import { IndexerTokenService } from '../indexer.token.service'; + +@Injectable() +export class IndexerLiquidityHandlerService { + constructor( + private readonly stateService: IndexerStateService, + private readonly pairService: IndexerPairService, + private readonly routerService: IndexerRouterService, + private readonly tokenService: IndexerTokenService, + ) {} + + public handleLiquidityEvent( + event: AddLiquidityEvent | RemoveLiquidityEvent, + ): [any[], number] { + const pair = this.stateService.getPairMetadata(event.address); + if (!pair) { + return [[], 0]; + } + + this.updatePairStateForLiquidityEvent(event); + + const firstTokenPriceUSD = + this.tokenService.computeTokenPriceDerivedUSD( + event.getFirstToken().tokenID, + ); + const secondTokenPriceUSD = + this.tokenService.computeTokenPriceDerivedUSD( + event.getSecondToken().tokenID, + ); + const newTotalLockedValueUSD = + this.routerService.computeTotalLockedValueUSD(); + + const data = []; + data['factory'] = { + totalLockedValueUSD: newTotalLockedValueUSD.toFixed(), + }; + const firstTokenLockedValueUSD = computeValueUSD( + event.getFirstTokenReserves().toFixed(), + pair.firstToken.decimals, + firstTokenPriceUSD, + ); + const secondTokenLockedValueUSD = computeValueUSD( + event.getSecondTokenReserves().toFixed(), + pair.secondToken.decimals, + secondTokenPriceUSD, + ); + const lockedValueUSD = firstTokenLockedValueUSD.plus( + secondTokenLockedValueUSD, + ); + + data[event.address] = { + firstTokenLocked: event.getFirstTokenReserves().toFixed(), + firstTokenLockedValueUSD: firstTokenLockedValueUSD.toFixed(), + secondTokenLocked: event.getSecondTokenReserves().toFixed(), + secondTokenLockedValueUSD: secondTokenLockedValueUSD.toFixed(), + lockedValueUSD: lockedValueUSD.toFixed(), + liquidity: event.getLiquidityPoolSupply().toFixed(), + }; + + const firstTokenTotalLockedValue = + this.pairService.getTokenTotalLockedValue( + pair.firstToken.identifier, + ); + const secondTokenTotalLockedValue = + this.pairService.getTokenTotalLockedValue( + pair.secondToken.identifier, + ); + + data[pair.firstToken.identifier] = { + lockedValue: firstTokenTotalLockedValue, + lockedValueUSD: computeValueUSD( + firstTokenTotalLockedValue, + pair.firstToken.decimals, + firstTokenPriceUSD, + ).toFixed(), + }; + data[pair.secondToken.identifier] = { + lockedValue: secondTokenTotalLockedValue, + lockedValueUSD: computeValueUSD( + secondTokenTotalLockedValue, + pair.secondToken.decimals, + secondTokenPriceUSD, + ).toFixed(), + }; + + return [data, event.getTimestamp().toNumber()]; + } + + private updatePairStateForLiquidityEvent( + event: AddLiquidityEvent | RemoveLiquidityEvent, + ): void { + GlobalState.pairsState[event.address] = { + firstTokenID: event.getFirstToken().tokenID, + secondTokenID: event.getSecondToken().tokenID, + firstTokenReserves: event.getFirstTokenReserves().toString(), + secondTokenReserves: event.getSecondTokenReserves().toString(), + liquidityPoolSupply: event.getLiquidityPoolSupply().toString(), + }; + } +} diff --git a/src/modules/analytics-indexer/services/event-handlers/indexer.price.discovery.handler.service.ts b/src/modules/analytics-indexer/services/event-handlers/indexer.price.discovery.handler.service.ts new file mode 100644 index 000000000..55a2ede27 --- /dev/null +++ b/src/modules/analytics-indexer/services/event-handlers/indexer.price.discovery.handler.service.ts @@ -0,0 +1,49 @@ +import { DepositEvent, WithdrawEvent } from '@multiversx/sdk-exchange'; +import { Injectable } from '@nestjs/common'; +import { IndexerPriceDiscoveryService } from '../indexer.price.discovery.service'; + +@Injectable() +export class IndexerPriceDiscoveryHandlerService { + constructor( + private readonly priceDiscoveryService: IndexerPriceDiscoveryService, + ) {} + + handlePriceDiscoveryEvent( + event: DepositEvent | WithdrawEvent, + ): [any[], number] { + const [ + priceDiscoveryAddress, + launchedTokenAmount, + acceptedTokenAmount, + launchedTokenPrice, + ] = [ + event.getAddress(), + event.launchedTokenAmount.toFixed(), + event.acceptedTokenAmount.toFixed(), + event.launchedTokenPrice, + ]; + + const acceptedTokenPrice = + this.priceDiscoveryService.computeAcceptedTokenPrice( + priceDiscoveryAddress, + event, + ); + const launchedTokenPriceUSD = + this.priceDiscoveryService.computeLaunchedTokenPriceUSD( + priceDiscoveryAddress, + event, + ); + + const data = []; + const timestamp = event.getTopics().toJSON().timestamp; + data[priceDiscoveryAddress] = { + launchedTokenAmount, + acceptedTokenAmount, + launchedTokenPrice, + acceptedTokenPrice, + launchedTokenPriceUSD, + }; + + return [data, timestamp]; + } +} diff --git a/src/modules/analytics-indexer/services/event-handlers/indexer.swap.handler.service.ts b/src/modules/analytics-indexer/services/event-handlers/indexer.swap.handler.service.ts new file mode 100644 index 000000000..888b48061 --- /dev/null +++ b/src/modules/analytics-indexer/services/event-handlers/indexer.swap.handler.service.ts @@ -0,0 +1,205 @@ +import { SwapEvent } from '@multiversx/sdk-exchange'; +import { Injectable } from '@nestjs/common'; +import BigNumber from 'bignumber.js'; +import { computeValueUSD } from 'src/utils/token.converters'; +import { GlobalState } from '../../global.state'; +import { IndexerStateService } from '../indexer.state.service'; +import { IndexerPairService } from '../indexer.pair.service'; +import { IndexerRouterService } from '../indexer.router.service'; + +@Injectable() +export class IndexerSwapHandlerService { + constructor( + private readonly stateService: IndexerStateService, + private readonly pairService: IndexerPairService, + private readonly routerService: IndexerRouterService, + ) {} + + public handleSwapEvent(event: SwapEvent): [any[], number] { + try { + const pair = this.stateService.getPairMetadata(event.address); + if (!pair) { + return [[], 0]; + } + + this.updateState(event); + + const [ + firstTokenAmount, + secondTokenAmount, + firstTokenReserve, + secondTokenReserve, + ] = + event.getTokenIn().tokenID === pair.firstToken.identifier + ? [ + event.getTokenIn().amount.toFixed(), + event.getTokenOut().amount.toFixed(), + event.getTokenInReserves().toFixed(), + event.getTokenOutReserves().toFixed(), + ] + : [ + event.getTokenOut().amount.toFixed(), + event.getTokenIn().amount.toFixed(), + event.getTokenOutReserves().toFixed(), + event.getTokenInReserves().toFixed(), + ]; + const firstTokenPrice = this.pairService.computeFirstTokenPrice( + event.address, + ); + const secondTokenPrice = this.pairService.computeSecondTokenPrice( + event.address, + ); + const firstTokenPriceUSD = + this.pairService.computeFirstTokenPriceUSD(event.address); + const secondTokenPriceUSD = + this.pairService.computeSecondTokenPriceUSD(event.address); + const liquidity = this.pairService.getTotalSupply(event.address); + + const newTotalLockedValueUSD = + this.routerService.computeTotalLockedValueUSD(); + + const firstTokenValues = { + firstTokenPrice, + firstTokenLocked: firstTokenReserve, + firstTokenLockedValueUSD: computeValueUSD( + firstTokenReserve, + pair.firstToken.decimals, + firstTokenPriceUSD, + ).toFixed(), + firstTokenVolume: firstTokenAmount, + }; + const secondTokenValues = { + secondTokenPrice, + secondTokenLocked: secondTokenReserve, + secondTokenLockedValueUSD: computeValueUSD( + secondTokenReserve, + pair.secondToken.decimals, + secondTokenPriceUSD, + ).toFixed(), + secondTokenVolume: secondTokenAmount, + }; + + const lockedValueUSD = new BigNumber( + firstTokenValues.firstTokenLockedValueUSD, + ) + .plus(secondTokenValues.secondTokenLockedValueUSD) + .toFixed(); + + const firstTokenVolumeUSD = computeValueUSD( + firstTokenValues.firstTokenVolume, + pair.firstToken.decimals, + firstTokenPriceUSD, + ); + const secondTokenVolumeUSD = computeValueUSD( + secondTokenValues.secondTokenVolume, + pair.secondToken.decimals, + secondTokenPriceUSD, + ); + const volumeUSD = firstTokenVolumeUSD + .plus(secondTokenVolumeUSD) + .dividedBy(2); + + const feesUSD = + event.getTokenIn().tokenID === pair.firstToken.identifier + ? computeValueUSD( + firstTokenAmount, + pair.firstToken.decimals, + firstTokenPriceUSD, + ).times(pair.totalFeePercent) + : computeValueUSD( + secondTokenAmount, + pair.secondToken.decimals, + secondTokenPriceUSD, + ).times(pair.totalFeePercent); + + const data = []; + data[event.address] = { + ...firstTokenValues, + ...secondTokenValues, + lockedValueUSD, + liquidity, + volumeUSD: volumeUSD.toFixed(), + feesUSD: feesUSD.toFixed(), + }; + + const firstTokenTotalLockedValue = + this.pairService.getTokenTotalLockedValue( + pair.firstToken.identifier, + ); + const secondTokenTotalLockedValue = + this.pairService.getTokenTotalLockedValue( + pair.secondToken.identifier, + ); + + data[pair.firstToken.identifier] = { + lockedValue: firstTokenTotalLockedValue, + lockedValueUSD: computeValueUSD( + firstTokenTotalLockedValue, + pair.firstToken.decimals, + firstTokenPriceUSD, + ).toFixed(), + priceUSD: firstTokenPriceUSD, + volume: firstTokenAmount, + volumeUSD: firstTokenVolumeUSD.toFixed(), + }; + data[pair.secondToken.identifier] = { + lockedValue: secondTokenTotalLockedValue, + lockedValueUSD: computeValueUSD( + secondTokenTotalLockedValue, + pair.secondToken.decimals, + secondTokenPriceUSD, + ).toFixed(), + priceUSD: secondTokenPriceUSD, + volume: secondTokenAmount, + volumeUSD: secondTokenVolumeUSD.toFixed(), + }; + + data['factory'] = { + totalLockedValueUSD: newTotalLockedValueUSD.toFixed(), + }; + + return [data, event.getTimestamp().toNumber()]; + } catch (error) { + throw error; + } + } + + private updateState(event: SwapEvent): void { + const firstToken = this.stateService.getFirstToken(event.address); + + if (!GlobalState.pairsState[event.address]) { + GlobalState.pairsState[event.address] = { + firstTokenID: firstToken.identifier, + secondTokenID: '', + firstTokenReserves: '0', + secondTokenReserves: '0', + liquidityPoolSupply: '0', + }; + } + + if ( + GlobalState.pairsState[event.address].firstTokenID === + event.getTokenIn().tokenID + ) { + GlobalState.pairsState[event.address] = { + firstTokenID: event.getTokenIn().tokenID, + secondTokenID: event.getTokenOut().tokenID, + firstTokenReserves: event.getTokenInReserves().toString(), + secondTokenReserves: event.getTokenOutReserves().toString(), + liquidityPoolSupply: + GlobalState.pairsState[event.address] + ?.liquidityPoolSupply ?? '0', + }; + } else { + GlobalState.pairsState[event.address] = { + firstTokenID: event.getTokenOut().tokenID, + secondTokenID: event.getTokenIn().tokenID, + firstTokenReserves: event.getTokenOutReserves().toString(), + secondTokenReserves: event.getTokenInReserves().toString(), + liquidityPoolSupply: + GlobalState.pairsState[event.address] + ?.liquidityPoolSupply ?? '0', + }; + } + } +} diff --git a/src/modules/analytics-indexer/services/indexer.pair.service.ts b/src/modules/analytics-indexer/services/indexer.pair.service.ts new file mode 100644 index 000000000..7a10a8fbc --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.pair.service.ts @@ -0,0 +1,203 @@ +import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import BigNumber from 'bignumber.js'; +import { quote } from 'src/modules/pair/pair.utils'; +import { constantsConfig, mxConfig } from 'src/config'; +import { GlobalState, PairState } from '../global.state'; +import { IndexerTokenService } from './indexer.token.service'; +import { IndexerStateService } from './indexer.state.service'; +import { PairInfoModel } from 'src/modules/pair/models/pair-info.model'; + +@Injectable() +export class IndexerPairService { + constructor( + private readonly stateService: IndexerStateService, + @Inject(forwardRef(() => IndexerTokenService)) + private readonly tokenService: IndexerTokenService, + ) {} + + public getTokenTotalLockedValue(tokenID: string): string { + const pairs = this.stateService.getPairsMetadata(); + let newLockedValue = new BigNumber(0); + for (const pair of pairs) { + if ( + tokenID !== pair.firstToken.identifier && + tokenID !== pair.secondToken.identifier + ) { + continue; + } + + const tokenReserve = + tokenID === pair.firstToken.identifier + ? this.getFirstTokenReserve(pair.address) + : this.getSecondTokenReserve(pair.address); + + newLockedValue = newLockedValue.plus(tokenReserve); + } + + return newLockedValue.toFixed(); + } + + public getTokenPriceUSD(tokenID: string): string { + return this.tokenService.computeTokenPriceDerivedUSD(tokenID); + } + + public getFirstTokenReserve(pairAddress: string): string { + return GlobalState.pairsState[pairAddress].firstTokenReserves; + } + + public getSecondTokenReserve(pairAddress: string): string { + return GlobalState.pairsState[pairAddress].secondTokenReserves; + } + + public getPairState(pairAddress: string): PairState { + return GlobalState.pairsState[pairAddress]; + } + + public computeLockedValueUSD(pairAddress: string): BigNumber { + const firstTokenLockedValueUSD = + this.computeFirstTokenLockedValueUSD(pairAddress); + const secondTokenLockedValueUSD = + this.computeSecondTokenLockedValueUSD(pairAddress); + + return firstTokenLockedValueUSD.plus(secondTokenLockedValueUSD); + } + + private computeFirstTokenLockedValueUSD(pairAddress: string): BigNumber { + const firstToken = this.stateService.getFirstToken(pairAddress); + const firstTokenPriceUSD = this.computeFirstTokenPriceUSD(pairAddress); + const firstTokenReserve = this.getFirstTokenReserve(pairAddress); + + return new BigNumber(firstTokenReserve) + .multipliedBy(`1e-${firstToken.decimals}`) + .multipliedBy(firstTokenPriceUSD); + } + + private computeSecondTokenLockedValueUSD(pairAddress: string): BigNumber { + const secondToken = this.stateService.getSecondToken(pairAddress); + const secondTokenPriceUSD = + this.computeSecondTokenPriceUSD(pairAddress); + const secondTokenReserve = this.getSecondTokenReserve(pairAddress); + + return new BigNumber(secondTokenReserve) + .multipliedBy(`1e-${secondToken.decimals}`) + .multipliedBy(secondTokenPriceUSD); + } + + computeFirstTokenPriceUSD(pairAddress: string): string { + const { firstToken, secondToken } = + this.stateService.getPairMetadata(pairAddress); + + if (firstToken.identifier === constantsConfig.USDC_TOKEN_ID) { + return new BigNumber(1).toFixed(); + } + + if (secondToken.identifier === constantsConfig.USDC_TOKEN_ID) { + return this.computeFirstTokenPrice(pairAddress); + } + + return this.tokenService.computeTokenPriceDerivedUSD( + firstToken.identifier, + ); + } + + computeSecondTokenPriceUSD(pairAddress: string): string { + const { firstToken, secondToken } = + this.stateService.getPairMetadata(pairAddress); + + if (secondToken.identifier === constantsConfig.USDC_TOKEN_ID) { + return new BigNumber(1).toFixed(); + } + + if (firstToken.identifier === constantsConfig.USDC_TOKEN_ID) { + return this.computeSecondTokenPrice(pairAddress); + } + + return this.tokenService.computeTokenPriceDerivedUSD( + secondToken.identifier, + ); + } + + private getEquivalentForLiquidity( + pairAddress: string, + tokenInID: string, + amount: string, + ): string { + const { firstToken, secondToken } = + this.stateService.getPairMetadata(pairAddress); + const pairInfo = this.getPairInfoMetadata(pairAddress); + + const tokenIn = + tokenInID === mxConfig.EGLDIdentifier + ? constantsConfig.WEGLD_TOKEN_ID + : tokenInID; + + if (!pairInfo) { + return new BigNumber(0).toFixed(); + } + + switch (tokenIn) { + case firstToken.identifier: + return quote( + amount, + pairInfo.reserves0, + pairInfo.reserves1, + ).toFixed(); + case secondToken.identifier: + return quote( + amount, + pairInfo.reserves1, + pairInfo.reserves0, + ).toFixed(); + default: + return new BigNumber(0).toFixed(); + } + } + + private getPairInfoMetadata(pairAddress: string): PairInfoModel { + if (GlobalState.pairsState[pairAddress]) { + return new PairInfoModel({ + reserves0: + GlobalState.pairsState[pairAddress].firstTokenReserves, + reserves1: + GlobalState.pairsState[pairAddress].secondTokenReserves, + totalSupply: + GlobalState.pairsState[pairAddress].liquidityPoolSupply, + }); + } + return undefined; + } + + public computeFirstTokenPrice(pairAddress: string): string { + const { firstToken, secondToken } = + this.stateService.getPairMetadata(pairAddress); + + const firstTokenPrice = this.getEquivalentForLiquidity( + pairAddress, + firstToken.identifier, + new BigNumber(`1e${firstToken.decimals}`).toFixed(), + ); + + return new BigNumber(firstTokenPrice) + .multipliedBy(`1e-${secondToken.decimals}`) + .toFixed(); + } + + public computeSecondTokenPrice(pairAddress: string): string { + const { firstToken, secondToken } = + this.stateService.getPairMetadata(pairAddress); + + const secondTokenPrice = this.getEquivalentForLiquidity( + pairAddress, + secondToken.identifier, + new BigNumber(`1e${firstToken.decimals}`).toFixed(), + ); + + return new BigNumber(secondTokenPrice) + .multipliedBy(`1e-${firstToken.decimals}`) + .toFixed(); + } + + getTotalSupply(pairAddress: string): string { + return GlobalState.pairsState[pairAddress].liquidityPoolSupply; + } +} diff --git a/src/modules/analytics-indexer/services/indexer.price.discovery.service.ts b/src/modules/analytics-indexer/services/indexer.price.discovery.service.ts new file mode 100644 index 000000000..f1877bb50 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.price.discovery.service.ts @@ -0,0 +1,78 @@ +import { Injectable } from '@nestjs/common'; +import { DepositEvent, WithdrawEvent } from '@multiversx/sdk-exchange'; +import { quote } from 'src/modules/pair/pair.utils'; +import { IndexerStateService } from './indexer.state.service'; +import { IndexerPairService } from './indexer.pair.service'; +import BigNumber from 'bignumber.js'; + +@Injectable() +export class IndexerPriceDiscoveryService { + constructor( + private readonly stateService: IndexerStateService, + private readonly pairService: IndexerPairService, + ) {} + + public computeAcceptedTokenPrice( + priceDiscoveryAddress: string, + event: DepositEvent | WithdrawEvent, + ): string { + const { launchedToken, acceptedToken } = + this.stateService.getPriceDiscoveryMetadata(priceDiscoveryAddress); + + const launchedTokenAmount = event.launchedTokenAmount.toString(); + const acceptedTokenAmount = event.acceptedTokenAmount.toString(); + + const acceptedTokenPrice = quote( + new BigNumber(`1e${acceptedToken.decimals}`).toFixed(), + acceptedTokenAmount, + launchedTokenAmount, + ); + + return new BigNumber(acceptedTokenPrice) + .multipliedBy(`1e-${launchedToken.decimals}`) + .toFixed(); + } + + private computeLaunchedTokenPrice( + priceDiscoveryAddress: string, + event: DepositEvent | WithdrawEvent, + ): string { + const { launchedToken, acceptedToken } = + this.stateService.getPriceDiscoveryMetadata(priceDiscoveryAddress); + + const launchedTokenAmount = event.launchedTokenAmount.toString(); + const acceptedTokenAmount = event.acceptedTokenAmount.toString(); + + const launchedTokenPrice = quote( + new BigNumber(`1e${launchedToken.decimals}`).toFixed(), + launchedTokenAmount, + acceptedTokenAmount, + ); + + return new BigNumber(launchedTokenPrice) + .multipliedBy(`1e-${acceptedToken.decimals}`) + .toFixed(); + } + + public computeLaunchedTokenPriceUSD( + priceDiscoveryAddress: string, + event: DepositEvent | WithdrawEvent, + ): string { + const { acceptedToken } = this.stateService.getPriceDiscoveryMetadata( + priceDiscoveryAddress, + ); + + const acceptedTokenPriceUSD = this.pairService.getTokenPriceUSD( + acceptedToken.identifier, + ); + + const launchedTokenPrice = this.computeLaunchedTokenPrice( + priceDiscoveryAddress, + event, + ); + + return new BigNumber(launchedTokenPrice) + .multipliedBy(acceptedTokenPriceUSD) + .toFixed(); + } +} diff --git a/src/modules/analytics-indexer/services/indexer.router.service.ts b/src/modules/analytics-indexer/services/indexer.router.service.ts new file mode 100644 index 000000000..1e71dc932 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.router.service.ts @@ -0,0 +1,33 @@ +import { Injectable } from '@nestjs/common'; +import BigNumber from 'bignumber.js'; +import { GlobalState } from '../global.state'; +import { IndexerPairService } from './indexer.pair.service'; + +@Injectable() +export class IndexerRouterService { + constructor(private readonly pairService: IndexerPairService) {} + + public computeTotalLockedValueUSD(): BigNumber { + const pairsAddress = this.getAllPairAddresses(); + + let totalValueLockedUSD = new BigNumber(0); + for (const pairAddress of pairsAddress) { + const lockedValueUSDBig = + this.pairService.computeLockedValueUSD(pairAddress); + + totalValueLockedUSD = !lockedValueUSDBig.isNaN() + ? totalValueLockedUSD.plus(lockedValueUSDBig) + : totalValueLockedUSD; + } + + return totalValueLockedUSD; + } + + private getAllPairAddresses(): string[] { + const pairAddresses = []; + for (const pairAddress in GlobalState.pairsState) { + pairAddresses.push(pairAddress); + } + return pairAddresses; + } +} diff --git a/src/modules/analytics-indexer/services/indexer.service.ts b/src/modules/analytics-indexer/services/indexer.service.ts new file mode 100644 index 000000000..ef013be20 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.service.ts @@ -0,0 +1,293 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { scAddress } from 'src/config'; +import { ElasticSearchEventsService } from 'src/services/elastic-search/services/es.events.service'; +import { convertEventTopicsAndDataToBase64 } from 'src/utils/elastic.search.utils'; +import { AnalyticsWriteService } from 'src/services/analytics/services/analytics.write.service'; +import { WINSTON_MODULE_PROVIDER } from 'nest-winston'; +import { Logger } from 'winston'; +import BigNumber from 'bignumber.js'; +import { RawElasticEventType } from 'src/services/elastic-search/entities/raw.elastic.event'; +import { + AddLiquidityEvent, + DepositEvent, + RemoveLiquidityEvent, + SwapEvent, + WithdrawEvent, +} from '@multiversx/sdk-exchange'; +import { IndexerStateService } from './indexer.state.service'; +import { IndexerSwapHandlerService } from './event-handlers/indexer.swap.handler.service'; +import { IndexerLiquidityHandlerService } from './event-handlers/indexer.liquidity.handler.service'; +import { IndexerPriceDiscoveryHandlerService } from './event-handlers/indexer.price.discovery.handler.service'; +import { + IndexerEventIdentifiers, + IndexerEventTypes, +} from '../entities/indexer.event.types'; + +@Injectable() +export class IndexerService { + private filterAddresses: string[]; + private data: any[]; + private errorsCount = 0; + private handleSwapEvents = false; + private handleLiquidityEvents = false; + private handlePriceDiscoveryEvents = false; + private eventIdentifiers: string[]; + + constructor( + private readonly elasticEventsService: ElasticSearchEventsService, + private readonly analyticsWrite: AnalyticsWriteService, + private readonly stateService: IndexerStateService, + private readonly swapHandlerService: IndexerSwapHandlerService, + private readonly liquidityHandlerService: IndexerLiquidityHandlerService, + private readonly priceDiscoveryHandlerService: IndexerPriceDiscoveryHandlerService, + @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger, + ) {} + + public async indexAnalytics( + startTimestamp: number, + endTimestamp: number, + eventTypes: IndexerEventTypes[], + ): Promise { + await this.initIndexerState(startTimestamp, eventTypes); + + await this.fetchEvents(startTimestamp, endTimestamp); + + return this.errorsCount; + } + + private async initIndexerState( + startTimestamp: number, + eventTypes: IndexerEventTypes[], + ): Promise { + await this.stateService.initState(startTimestamp); + + this.filterAddresses = []; + this.eventIdentifiers = []; + this.errorsCount = 0; + const pairs = this.stateService.getPairsMetadata(); + + this.filterAddresses.push(...pairs.map((pair) => pair.address)); + this.filterAddresses.push(...scAddress.priceDiscovery); + + this.handleSwapEvents = eventTypes.includes( + IndexerEventTypes.SWAP_EVENTS, + ); + this.handleLiquidityEvents = eventTypes.includes( + IndexerEventTypes.LIQUIDITY_EVENTS, + ); + this.handlePriceDiscoveryEvents = eventTypes.includes( + IndexerEventTypes.PRICE_DISCOVERY_EVENTS, + ); + + if (this.handleSwapEvents) { + this.eventIdentifiers.push( + IndexerEventIdentifiers.SWAP_FIXED_INPUT, + ); + this.eventIdentifiers.push( + IndexerEventIdentifiers.SWAP_FIXED_OUTPUT, + ); + } + + if (this.handleLiquidityEvents) { + this.eventIdentifiers.push(IndexerEventIdentifiers.ADD_LIQUIDITY); + this.eventIdentifiers.push( + IndexerEventIdentifiers.REMOVE_LIQUIDITY, + ); + } + + if (this.handlePriceDiscoveryEvents) { + this.eventIdentifiers.push( + IndexerEventIdentifiers.PRICE_DISCOVERY_DEPOSIT, + ); + this.eventIdentifiers.push( + IndexerEventIdentifiers.PRICE_DISCOVERY_WITHDRAW, + ); + } + } + + private async fetchEvents( + startTimestamp: number, + endTimestamp: number, + ): Promise { + const eventsByTimestamp: Map = new Map(); + + const processEventsAction = async (events: any[]): Promise => { + for (const event of events) { + try { + const rawEvent = convertEventTopicsAndDataToBase64(event); + if (rawEvent.data === '') { + continue; + } + + if (eventsByTimestamp.has(rawEvent.timestamp)) { + const rawEvents = eventsByTimestamp.get( + rawEvent.timestamp, + ); + rawEvents.push(rawEvent); + eventsByTimestamp.set(rawEvent.timestamp, rawEvents); + } else { + eventsByTimestamp.set(rawEvent.timestamp, [rawEvent]); + } + } catch (error) { + if ( + error?.message?.includes('Cannot create address from') + ) { + console.log('Invalid event', event); + } else { + console.log(`Could not process event:`, event); + console.log(error); + } + this.incrementErrorsCount(); + continue; + } + } + + // skip processing for a single timestamp + if (eventsByTimestamp.size <= 1) { + return; + } + + const timestampsCount = eventsByTimestamp.size; + const eventTimestamps = eventsByTimestamp.keys(); + + for (let i = 0; i < timestampsCount - 1; i++) { + const timestamp = eventTimestamps.next().value; + const rawEvents = eventsByTimestamp.get(timestamp); + + await this.processEvents(rawEvents); + + eventsByTimestamp.delete(timestamp); + } + }; + + await this.elasticEventsService.getEventsForAddresses( + this.filterAddresses, + this.eventIdentifiers, + startTimestamp, + endTimestamp, + processEventsAction, + 5000, + ); + + // process remaining batch of events + if (eventsByTimestamp.size > 0) { + for (const rawEvents of eventsByTimestamp.values()) { + await this.processEvents(rawEvents); + } + } + } + + private async processEvents( + rawEvents: RawElasticEventType[] | undefined, + ): Promise { + if (!rawEvents || rawEvents.length === 0) { + return; + } + + this.data = []; + let timestamp: number; + + let eventData: any[] = []; + for (const rawEvent of rawEvents) { + try { + switch (rawEvent.identifier) { + case IndexerEventIdentifiers.SWAP_FIXED_INPUT: + case IndexerEventIdentifiers.SWAP_FIXED_OUTPUT: + if (!this.handleSwapEvents) { + break; + } + [eventData, timestamp] = + this.swapHandlerService.handleSwapEvent( + new SwapEvent(rawEvent), + ); + break; + case IndexerEventIdentifiers.ADD_LIQUIDITY: + if (!this.handleLiquidityEvents) { + break; + } + [eventData, timestamp] = + this.liquidityHandlerService.handleLiquidityEvent( + new AddLiquidityEvent(rawEvent), + ); + break; + case IndexerEventIdentifiers.REMOVE_LIQUIDITY: + if (!this.handleLiquidityEvents) { + break; + } + [eventData, timestamp] = + this.liquidityHandlerService.handleLiquidityEvent( + new RemoveLiquidityEvent(rawEvent), + ); + break; + case IndexerEventIdentifiers.PRICE_DISCOVERY_DEPOSIT: + if (!this.handlePriceDiscoveryEvents) { + break; + } + [eventData, timestamp] = + this.priceDiscoveryHandlerService.handlePriceDiscoveryEvent( + new DepositEvent(rawEvent), + ); + break; + case IndexerEventIdentifiers.PRICE_DISCOVERY_WITHDRAW: + if (!this.handlePriceDiscoveryEvents) { + break; + } + [eventData, timestamp] = + this.priceDiscoveryHandlerService.handlePriceDiscoveryEvent( + new WithdrawEvent(rawEvent), + ); + break; + } + this.updateIngestData(eventData); + } catch (error) { + this.logger.error(error); + this.incrementErrorsCount(); + } + } + + if (Object.keys(this.data).length > 0) { + await this.analyticsWrite.ingest({ + data: this.data, + Time: timestamp, + }); + + this.logger.info( + `Ingested records for ${ + Object.keys(this.data).length + } series at timestamp ${timestamp}. Events processed: ${ + rawEvents.length + }`, + { + context: 'IndexerService', + }, + ); + } + } + + private updateIngestData(eventData: any[]): void { + for (const series of Object.keys(eventData)) { + if (this.data[series] === undefined) { + this.data[series] = {}; + } + + for (const measure of Object.keys(eventData[series])) { + if ( + measure.toLowerCase().includes('volume') || + measure.toLowerCase().includes('fees') + ) { + this.data[series][measure] = this.data[series][measure] + ? new BigNumber(this.data[series][measure]) + .plus(eventData[series][measure]) + .toFixed() + : eventData[series][measure]; + } else { + this.data[series][measure] = eventData[series][measure]; + } + } + } + } + + private incrementErrorsCount(): void { + this.errorsCount += 1; + } +} diff --git a/src/modules/analytics-indexer/services/indexer.state.service.ts b/src/modules/analytics-indexer/services/indexer.state.service.ts new file mode 100644 index 000000000..3be9c36e5 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.state.service.ts @@ -0,0 +1,243 @@ +import { Injectable } from '@nestjs/common'; +import { RouterAbiService } from '../../router/services/router.abi.service'; +import { PairService } from '../../pair/services/pair.service'; +import { PairAbiService } from '../../pair/services/pair.abi.service'; +import { AnalyticsQueryService } from 'src/services/analytics/services/analytics.query.service'; +import { Constants } from '@multiversx/sdk-nestjs-common'; +import { TokenService } from 'src/modules/tokens/services/token.service'; +import { GetOrSetCache } from 'src/helpers/decorators/caching.decorator'; +import { scAddress } from 'src/config'; +import { PriceDiscoveryAbiService } from 'src/modules/price-discovery/services/price.discovery.abi.service'; +import { GlobalState } from '../global.state'; +import { PairMetadata } from '../entities/pair.metadata'; +import { EsdtToken } from 'src/modules/tokens/models/esdtToken.model'; +import { PriceDiscoveryMetadata } from '../entities/price.discovery.metadata'; +import { CacheService } from '@multiversx/sdk-nestjs-cache'; + +@Injectable() +export class IndexerStateService { + private pairs: PairMetadata[] = []; + private priceDiscoverySCs: PriceDiscoveryMetadata[] = []; + + constructor( + private readonly routerAbiService: RouterAbiService, + private readonly pairAbiService: PairAbiService, + private readonly pairService: PairService, + private readonly analyticsQueryService: AnalyticsQueryService, + private readonly tokenService: TokenService, + private readonly priceDiscoveryAbi: PriceDiscoveryAbiService, + private readonly cacheService: CacheService, + ) {} + + public async initState(startTimestamp: number): Promise { + this.pairs = []; + + const pairAddresses = await this.allPairAddresses(); + + const allFirstTokens = await this.allFirstTokens(pairAddresses); + const allSecondTokens = await this.allSecondTokens(pairAddresses); + const allFeePercentages = await this.allFeePercentages(pairAddresses); + + const priceDiscoveryAddresses: string[] = scAddress.priceDiscovery; + const pdLaunchedTokens = await this.allPDLaunchedTokens( + priceDiscoveryAddresses, + ); + const pdAcceptedTokens = await this.allPDAcceptedTokens( + priceDiscoveryAddresses, + ); + + for (const [index, pdAddress] of priceDiscoveryAddresses.entries()) { + const priceDiscover = new PriceDiscoveryMetadata({ + address: pdAddress, + launchedToken: pdLaunchedTokens[index], + acceptedToken: pdAcceptedTokens[index], + }); + + this.priceDiscoverySCs.push(priceDiscover); + } + + for (const [index, pairAddress] of pairAddresses.entries()) { + const pair = new PairMetadata({ + address: pairAddress, + firstToken: allFirstTokens[index], + secondToken: allSecondTokens[index], + totalFeePercent: allFeePercentages[index], + }); + + this.pairs.push(pair); + + GlobalState.pairsState[pair.address] = { + firstTokenID: pair.firstToken.identifier, + secondTokenID: pair.secondToken.identifier, + firstTokenReserves: + await this.analyticsQueryService.getLastForMetric( + pair.address, + 'firstTokenLocked', + startTimestamp, + ), + secondTokenReserves: + await this.analyticsQueryService.getLastForMetric( + pair.address, + 'secondTokenLocked', + startTimestamp, + ), + liquidityPoolSupply: + await this.analyticsQueryService.getLastForMetric( + pair.address, + 'liquidity', + startTimestamp, + ), + }; + } + } + + @GetOrSetCache({ + baseKey: 'indexer', + remoteTtl: Constants.oneDay(), + localTtl: Constants.oneDay(), + }) + public async allPairAddresses(): Promise { + return await this.routerAbiService.pairsAddress(); + } + + private async allFirstTokens( + pairAddresses: string[], + ): Promise { + return await this.cacheService.getOrSet( + `indexer.allFirstTokens`, + async () => await this.pairService.getAllFirstTokens(pairAddresses), + Constants.oneDay(), + Constants.oneDay(), + ); + } + + private async allSecondTokens( + pairAddresses: string[], + ): Promise { + return await this.cacheService.getOrSet( + `indexer.allSecondTokens`, + async () => + await this.pairService.getAllSecondTokens(pairAddresses), + Constants.oneDay(), + Constants.oneDay(), + ); + } + + private async allFeePercentages( + pairAddresses: string[], + ): Promise { + return await this.cacheService.getOrSet( + `indexer.allFeePercentages`, + async () => + await Promise.all( + pairAddresses.map((address) => + this.pairAbiService.totalFeePercent(address), + ), + ), + Constants.oneDay(), + Constants.oneDay(), + ); + } + + private async allPDLaunchedTokens( + priceDiscoveryAddresses: string[], + ): Promise { + return await this.cacheService.getOrSet( + `indexer.allPDLaunchedTokens`, + async () => { + const launchedTokenIDs = await Promise.all( + priceDiscoveryAddresses.map((address) => + this.priceDiscoveryAbi.launchedTokenID(address), + ), + ); + return await this.tokenService.getAllTokensMetadata( + launchedTokenIDs, + ); + }, + Constants.oneDay(), + Constants.oneDay(), + ); + } + private async allPDAcceptedTokens( + priceDiscoveryAddresses: string[], + ): Promise { + return await this.cacheService.getOrSet( + `indexer.allPDAcceptedTokens`, + async () => { + const acceptedTokenIDs = await Promise.all( + priceDiscoveryAddresses.map((address) => + this.priceDiscoveryAbi.acceptedTokenID(address), + ), + ); + return await this.tokenService.getAllTokensMetadata( + acceptedTokenIDs, + ); + }, + Constants.oneDay(), + Constants.oneDay(), + ); + } + + @GetOrSetCache({ + baseKey: 'indexer', + remoteTtl: Constants.oneDay(), + localTtl: Constants.oneDay(), + }) + public async getTokenMetadata( + tokenID: string, + ): Promise { + return await this.tokenService.tokenMetadata(tokenID); + } + + public getPairsMetadata(): PairMetadata[] { + return this.pairs; + } + + public getPairMetadata(pairAddress: string): PairMetadata { + return this.pairs.find((pair) => pair.address === pairAddress); + } + + public getFirstToken(pairAddress: string): EsdtToken { + const pair = this.getPairMetadata(pairAddress); + return pair.firstToken; + } + + public getSecondToken(pairAddress: string): EsdtToken { + const pair = this.getPairMetadata(pairAddress); + return pair.secondToken; + } + + public getPairByTokens(token1ID: string, token2ID: string): PairMetadata { + return this.pairs.find( + (p) => + (p.firstToken.identifier === token1ID && + p.secondToken.identifier === token2ID) || + (p.firstToken.identifier === token2ID && + p.secondToken.identifier === token1ID), + ); + } + + public getTokenPairs(tokenID: string): PairMetadata[] { + const tokenPairs: PairMetadata[] = []; + for (const pair of this.pairs) { + if ( + pair.firstToken.identifier === tokenID || + pair.secondToken.identifier === tokenID + ) { + tokenPairs.push(pair); + } + } + return tokenPairs; + } + + public isValidPair(address: string): boolean { + const pair = this.getPairMetadata(address); + return pair !== undefined; + } + + public getPriceDiscoveryMetadata(address: string): PriceDiscoveryMetadata { + return this.priceDiscoverySCs.find( + (priceDiscovery) => priceDiscovery.address === address, + ); + } +} diff --git a/src/modules/analytics-indexer/services/indexer.token.service.ts b/src/modules/analytics-indexer/services/indexer.token.service.ts new file mode 100644 index 000000000..b82a8e397 --- /dev/null +++ b/src/modules/analytics-indexer/services/indexer.token.service.ts @@ -0,0 +1,124 @@ +import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import BigNumber from 'bignumber.js'; +import { + constantsConfig, + mxConfig, + scAddress, + tokenProviderUSD, +} from 'src/config'; +import { IndexerStateService } from './indexer.state.service'; +import { PairMetadata } from '../entities/pair.metadata'; +import { IndexerPairService } from './indexer.pair.service'; + +@Injectable() +export class IndexerTokenService { + constructor( + private readonly stateService: IndexerStateService, + @Inject(forwardRef(() => IndexerPairService)) + private readonly pairService: IndexerPairService, + ) {} + + public computeTokenPriceDerivedUSD(tokenID: string): string { + const egldPriceUSD = this.getEgldPriceInUSD(); + const derivedEGLD = this.computeTokenPriceDerivedEGLD(tokenID, []); + + return new BigNumber(derivedEGLD).times(egldPriceUSD).toFixed(); + } + + getEgldPriceInUSD(): string { + return this.pairService.computeFirstTokenPrice(scAddress.WEGLD_USDC); + } + + private computeTokenPriceDerivedEGLD( + tokenID: string, + pairsNotToVisit: PairMetadata[], + ): string { + if (tokenID === tokenProviderUSD) { + return new BigNumber('1').toFixed(); + } + + let tokenPairs = this.stateService.getTokenPairs(tokenID); + + tokenPairs = tokenPairs.filter( + (pair) => + pairsNotToVisit.find( + (pairNotToVisit) => pairNotToVisit.address === pair.address, + ) === undefined, + ); + + pairsNotToVisit.push(...tokenPairs); + + let largestLiquidityEGLD = new BigNumber(0); + let priceSoFar = '0'; + + if (tokenID === constantsConfig.USDC_TOKEN_ID) { + const egldPriceUSD = this.getEgldPriceInUSD(); + priceSoFar = new BigNumber(1).dividedBy(egldPriceUSD).toFixed(); + } else { + for (const pair of tokenPairs) { + const liquidity = this.pairService.getTotalSupply(pair.address); + + if (new BigNumber(liquidity).isZero()) { + continue; + } + + const { firstTokenReserves, secondTokenReserves } = + this.pairService.getPairState(pair.address); + + if (pair.firstToken.identifier === tokenID) { + const secondTokenDerivedEGLD = + this.computeTokenPriceDerivedEGLD( + pair.secondToken.identifier, + pairsNotToVisit, + ); + + const firstTokenPrice = + this.pairService.computeFirstTokenPrice(pair.address); + + // const egldLocked = new BigNumber(secondTokenReserves).times( + // secondTokenDerivedEGLD, + // ); + const egldLocked = new BigNumber(secondTokenReserves) + .times(`1e-${pair.secondToken.decimals}`) + .times(secondTokenDerivedEGLD) + .times(`1e${mxConfig.EGLDDecimals}`) + .integerValue(); + + if (egldLocked.isGreaterThan(largestLiquidityEGLD)) { + largestLiquidityEGLD = egldLocked; + priceSoFar = new BigNumber(firstTokenPrice) + .times(secondTokenDerivedEGLD) + .toFixed(); + } + } + + if (pair.secondToken.identifier === tokenID) { + const firstTokenDerivedEGLD = + this.computeTokenPriceDerivedEGLD( + pair.firstToken.identifier, + pairsNotToVisit, + ); + + const secondTokenPrice = + this.pairService.computeSecondTokenPrice(pair.address); + + // const egldLocked = new BigNumber(firstTokenReserves).times( + // firstTokenDerivedEGLD, + // ); + const egldLocked = new BigNumber(firstTokenReserves) + .times(`1e-${pair.firstToken.decimals}`) + .times(firstTokenDerivedEGLD) + .times(`1e${mxConfig.EGLDDecimals}`) + .integerValue(); + if (egldLocked.isGreaterThan(largestLiquidityEGLD)) { + largestLiquidityEGLD = egldLocked; + priceSoFar = new BigNumber(secondTokenPrice) + .times(firstTokenDerivedEGLD) + .toFixed(); + } + } + } + } + return priceSoFar; + } +} diff --git a/src/services/analytics/interfaces/analytics.query.interface.ts b/src/services/analytics/interfaces/analytics.query.interface.ts index c08ff9f84..524583d3c 100644 --- a/src/services/analytics/interfaces/analytics.query.interface.ts +++ b/src/services/analytics/interfaces/analytics.query.interface.ts @@ -8,6 +8,12 @@ import { AnalyticsQueryArgs } from '../entities/analytics.query.args'; export interface AnalyticsQueryInterface { getAggregatedValue(args: AnalyticsQueryArgs): Promise; + getLastForMetric( + series: string, + metric: string, + time: number, + ): Promise; + getLatestCompleteValues( args: AnalyticsQueryArgs, ): Promise; diff --git a/src/services/analytics/mocks/analytics.query.service.mock.ts b/src/services/analytics/mocks/analytics.query.service.mock.ts index 545cbe7ea..928e63b03 100644 --- a/src/services/analytics/mocks/analytics.query.service.mock.ts +++ b/src/services/analytics/mocks/analytics.query.service.mock.ts @@ -22,6 +22,13 @@ export class AnalyticsQueryServiceMock implements AnalyticsQueryInterface { getAggregatedValue(args: AnalyticsQueryArgs): Promise { throw new Error('Method not implemented.'); } + getLastForMetric( + series: string, + metric: string, + time: number, + ): Promise { + throw new Error('Method not implemented.'); + } getLatestCompleteValues( args: AnalyticsQueryArgs, ): Promise { diff --git a/src/services/analytics/services/analytics.query.service.ts b/src/services/analytics/services/analytics.query.service.ts index 8251f0d17..98abf8941 100644 --- a/src/services/analytics/services/analytics.query.service.ts +++ b/src/services/analytics/services/analytics.query.service.ts @@ -21,6 +21,15 @@ export class AnalyticsQueryService implements AnalyticsQueryInterface { return await service.getAggregatedValue(args); } + async getLastForMetric( + series: string, + metric: string, + time: number, + ): Promise { + const service = await this.getService(); + return await service.getLastForMetric(series, metric, time); + } + async getLatestCompleteValues(args: { series: any; metric: any; diff --git a/src/services/analytics/timescaledb/timescaledb.query.service.ts b/src/services/analytics/timescaledb/timescaledb.query.service.ts index 945197c2c..164fca850 100644 --- a/src/services/analytics/timescaledb/timescaledb.query.service.ts +++ b/src/services/analytics/timescaledb/timescaledb.query.service.ts @@ -117,6 +117,37 @@ export class TimescaleDBQueryService implements AnalyticsQueryInterface { return query?.sum ?? '0'; } + @TimescaleDBQuery() + async getLastForMetric( + series: string, + metric: string, + time: number, + ): Promise { + try { + const endDate = moment.unix(time).utc().toDate(); + + const query = this.dexAnalytics + .createQueryBuilder() + .select('value') + .where('series = :series', { series }) + .andWhere('key = :metric', { metric }) + .andWhere(`timestamp <= :endDate`, { endDate }) + .orderBy('timestamp', 'DESC') + .limit(1); + const last = await query.getRawOne(); + + return last?.value ?? '0'; + } catch (error) { + this.logger.error('getLastForMetric', { + series, + metric, + time, + error, + }); + throw error; + } + } + @TimescaleDBQuery() async getLatestCompleteValues({ series, diff --git a/src/services/elastic-search/services/es.events.service.ts b/src/services/elastic-search/services/es.events.service.ts index 6642ec2cb..93f513c91 100644 --- a/src/services/elastic-search/services/es.events.service.ts +++ b/src/services/elastic-search/services/es.events.service.ts @@ -194,4 +194,55 @@ export class ElasticSearchEventsService { action, ); } + + async getEventsForAddresses( + addresses: string[], + eventIdentifiers: string[], + startTimestamp: number, + endTimestamp: number, + action: (items: any[]) => Promise, + size = 100, + ): Promise { + const pagination = new ElasticPagination(); + pagination.size = size; + + const elasticQueryAdapter: ElasticQuery = + new ElasticQuery().withPagination(pagination); + + elasticQueryAdapter.condition.must = [ + QueryType.Should( + eventIdentifiers.map((identifier) => + QueryType.Match('identifier', identifier), + ), + ), + QueryType.Should( + addresses.map((address) => QueryType.Match('address', address)), + ), + ]; + + elasticQueryAdapter.filter = [ + QueryType.Range( + 'timestamp', + { + key: 'gte', + value: startTimestamp, + }, + { + key: 'lte', + value: endTimestamp, + }, + ), + ]; + + elasticQueryAdapter.sort = [ + { name: 'timestamp', order: ElasticSortOrder.ascending }, + ]; + + await this.elasticService.getScrollableList( + 'events', + '', + elasticQueryAdapter, + action, + ); + } }